Posted on Leave a comment

How to use Mini STM32 v3.0 USB Port as Virtual Com Port

To use USB port of the mini STM32 v3. We need to configure the USB port and for that, we have to look at the schematics.

You can view the Full schematic here https://www.exasub.com/development-kit/mini-stm32-v3-0/mini-stm32-v3-0/

From the schematics, we see that there are three pins associated with the USB port.
1. PA11
2. PA12
3. PD2

The Pins PA11 and PA12 are
PA11 – USB D-
PA12 – USB D+

These two pins will be configured by the stm32cube ide when you enable the USB device.

PD2 should be configured as GPIO pin.
Because the USB FS implementation says to pull up the D+ line to 3.3V.
The pull up is performed by the S8550 PNP transistor.
So by making the PD2 pin LOW, we enable the USB FS, since it makes the D+ pull up.

We also need to select the Communication Device Class as Class For FS IP.

After configuring the cube mx project. we can proceed to generate code.

The code needs to add the following header file

/* USER CODE BEGIN Includes */
#include "usbd_cdc_if.h"
#include "string.h"
/* USER CODE END Includes */

and then in the main() function. you can write this code in while loop

  /* USER CODE BEGIN 2 */
HAL_GPIO_WritePin(GPIOD, GPIO_PIN_2, GPIO_PIN_RESET);
  /* USER CODE END 2 */


/* USER CODE BEGIN WHILE */
uint8_t *data = "Hello World from USB CDC\r\n";
  while (1)
  {
	  CDC_Transmit_FS(data, strlen(data));
	  HAL_Delay (1000);
    /* USER CODE END WHILE */

    /* USER CODE BEGIN 3 */
  }
  /* USER CODE END 3 */

After uploading the code to your microcontroller. It will be displayed in your windows device manager as “USB Serial Device”.

You can Connect to the Com port using the serial terminal software such as YAT, Putty or CoolTerm etc.
Note: Since it is a virtual com port, you dont have to set a specific baud rate.

Posted on Leave a comment

AVRHexFlashGUI: an AVRdude Hex Flasher GUI Application with Quick Flash Floating Button

Introduction

To program the AVR microcontroller you use “avrdude” to flash the hex files in the microcontroller. avrdude is a command line tool. Every time you have to flash the microcontroller with a new file you have to write the command.
This AVRHexFlashGUI uses the avrdude software to flash the microcontroller but it provides a Graphical User Interface.

There is a special feature I have implemented which is “QuickSet” Toolbar which has a “Quick FLASH” button. This toolbar floats on top of the screen. And provides an easy-to-use button for flashing the firmware files. It has an indicator that turns it’s color to green if the programming is successful or turns red if the programming is unsuccessful. It also has an ‘X’ button to switch off this toolbar.
Software such as microchip mplab x ide does not have an option to integrate the avrdude commands like in microchip studio(formerly Atmel Studio). This toolbar provides an easy access to flashing. You do not have to do anything else. Just launch the program and enable the QuickSet toolbar.

AVR microcontrollers are widely used in embedded systems development due to their versatility and ease of programming and also their low cost. Flashing firmware or code onto these microcontrollers is a fundamental task during development. The AVRHexFlashGUI is a graphical user interface (GUI) application built using the Tkinter library in Python.

Note: avrdude software must be installed on your system.
Download avrdude for windows https://github.com/avrdudes/avrdude/releases

Download

Screenshots

Overview

The AVRHexFlashGUI application streamlines the process of flashing firmware onto AVR microcontrollers using a user-friendly interface. It offers the following features:

  1. Microcontroller Selection: The application allows users to select the target microcontroller from a list of supported devices.
  2. Programmer Configuration: Users can specify the programmer to be used for flashing the firmware. This information is crucial for establishing a connection between the computer and the microcontroller.
  3. HEX File Selection: Users can browse their computer for the HEX file containing the firmware they want to flash onto the microcontroller.
  4. Quick Flash: The application provides a quick flash button that initiates the flashing process using the selected programmer, microcontroller, and HEX file.
  5. Read the Microcontroller Flash Memory: The user can read the unprotected flash memory and save it in the program folder as “HexFromDevice.hex”
  6. AVR Fuse Settings: The application offers the ability to read and display the current fuse settings of the connected AVR microcontroller. Users can also write new fuse settings if needed.
  7. Output Display: The application displays the output of the avrdude tool, which is responsible for flashing the firmware and handling the communication with the microcontroller.

Usage

To use the AVRHexFlashGUI application, follow these steps:

  1. Launch the application.
  2. Select the target microcontroller.
  3. Specify the programmer to be used for flashing.
  4. Load the desired HEX file by clicking the “Browse” button.
  5. Click the “Program AVR” button to start the flashing process.
  6. Monitor the avrdude output to ensure successful flashing.
  7. Optionally, use the “Fuse Setting” section to read, modify, and write fuse settings.
AVRHexFlash Quick Flash Toolbar with MPLAB X IDE

Conclusion

The AVRHexFlashGUI application simplifies the process of flashing firmware onto AVR microcontrollers by providing an intuitive and user-friendly interface. With features for microcontroller selection, programmer configuration, HEX file loading, and fuse settings, developers can efficiently program their microcontrollers. The use of Tkinter and Python makes it easy to create and customize the GUI, enhancing the overall user experience. This application is a valuable tool for both beginners and experienced developers working with AVR microcontrollers. By streamlining the flashing process, it helps save time and ensures accurate firmware deployment.

Read more: AVRHexFlashGUI: an AVRdude Hex Flasher GUI Application with Quick Flash Floating Button

Code

from tkinter import *
from tkinter import ttk
import serial
import serial.tools.list_ports
import datetime
import os
import sys
import subprocess
from tkinter import filedialog
# Get the directory of the script
script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))

# Change the working directory to the script's directory
os.chdir(script_dir)
# For debug messages to be printed
dbg_msg = 0

root = Tk()
root.title("AVRHexFlashGUI")
root.resizable(False, False)  # Prevent window from being resized
sty = ttk.Style()
sty.theme_use("default")
#print(sty.theme_names())
root.columnconfigure(0, weight = 1)
root.rowconfigure(0, weight = 1)
# Frame Define START
Frame00 = ttk.LabelFrame(root, text = "AVRHexFlashGUI", padding = 10)
Frame00.grid(column = 0, row = 0, sticky =  (W,E) )
Frame01 = ttk.LabelFrame(root, text = "Fuse Setting", padding = 10)
Frame01.grid(column = 2, row = 0, sticky =  (W,E) )
## Frame Define START
canvas = None
circle = None
# Frame00 START
label0011 = ttk.Label(Frame00, text = "Microcontroller")
label0011.grid(column = 0, row = 0, sticky = (N,E,W,S))

MCU_save_stored_value = StringVar()
MCU_save_stored_value_display = Text(Frame00, height = 1, width = 2, wrap=WORD)
MCU_save_stored_value_display.grid(row = 0, column = 1, sticky = (N,E,W,S))
# Display the file value
file_path = os.path.join("Data", "MCU_save.txt")
if os.path.exists(file_path):
    with open(file_path, 'r') as file:
        MCU_value = file.read()
        MCU_save_stored_value_display.insert(END, MCU_value)

programmer_label = ttk.Label(Frame00, text="Programmer:")
programmer_label.grid(row = 0, column  = 2, sticky = (N,E,W,S))
programmer_var = StringVar()
programmer_var_display = Text(Frame00, height = 1, width = 10, wrap=WORD)
programmer_var_display.grid(row = 0, column = 3, sticky = (N,E,W,S))
# Display the file value
file_path = os.path.join("Data", "avrdude_programmer.txt")
if os.path.exists(file_path):
    with open(file_path, 'r') as file:
        Prog_value = file.read()
        programmer_var_display.insert(END, Prog_value)


label0010 = ttk.Label(Frame00, text="Select HEX File:")
label0010.grid(row = 1, column  = 0, sticky = (N,E,W,S))
hex_file_path = StringVar()
hex_file_path.set("Load your File")

def browse_file():
    file_path = filedialog.askopenfilename(filetypes=[("HEX Files", "*.hex")])
    if file_path:
        hex_file_path.delete(1.0, END)  # Clear previous text
        hex_file_path.insert(END, file_path)
        hex_file_path.config(wrap=WORD)  # Enable text wrapping

    new_complete_command = ">> avrdude " + "-c " + programmer_var_display.get('1.0', 'end-1c') \
                           + " -p " + MCU_save_stored_value_display.get('1.0', 'end-1c') \
                           + " -U " + "flash:w:" + hex_file_path.get('1.0', 'end-1c') + ":i"
    complete_command_var.set(new_complete_command)

def copy_to_clipboard():
    start_index = hex_file_path.index(SEL_FIRST)
    end_index = hex_file_path.index(SEL_LAST)
    
    if start_index and end_index:
        selected_text = hex_file_path.get(start_index, end_index)
        root.clipboard_clear()
        root.clipboard_append(selected_text)
        root.update()
def save_to_file():
    file_path = os.path.join("Data", "Hex_save_location.txt")
    with open(file_path, 'w') as file:
        file.write(hex_file_path.get('1.0', 'end-1c'))
    file_path = os.path.join("Data", "avrdude_programmer.txt")
    with open(file_path, 'w') as file:
        file.write(programmer_var_display.get('1.0', 'end-1c'))
    file_path = os.path.join("Data", "MCU_save.txt")
    with open(file_path, 'w') as file:
        file.write(MCU_save_stored_value_display.get('1.0', 'end-1c'))

def getMcuList():    
    prog_name = programmer_var_display.get('1.0','end-1c')
    command = ['avrdude','-c',prog_name]
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    
    output_lines = result.stderr.split('\n')
    for line in output_lines:
        display_avrdude_output.insert(END, line + '\n')
    display_avrdude_output.see("end")

def DumpHex():    
    prog_name = programmer_var_display.get('1.0','end-1c')
    part_name = MCU_save_stored_value_display.get('1.0','end-1c')
    command = ['avrdude','-c',prog_name,'-p',part_name,'-U', 'flash:r:HexFromDevice.hex:i']
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    display_avrdude_output.delete('1.0',END)
    output_lines = result.stderr.split('\n')
    for line in output_lines:
        display_avrdude_output.insert(END, line + '\n')
    display_avrdude_output.see("end")    
    
button0011 = ttk.Button(Frame00, text="Browse", command=browse_file)
button0011.grid(row = 1, column  = 1,sticky = (N,E,W,S))

McuListbutton = ttk.Button(Frame00, text="Get Supported\nMCU List", command=getMcuList)
McuListbutton.grid(row = 1, column  = 3,sticky = (N,E,W,S))
DumpHexbutton = ttk.Button(Frame00, text="Read Device\nSave File", command=DumpHex)
DumpHexbutton.grid(row = 1, column  = 2,sticky = (N,E,W,S))

hex_file_path = Text(Frame00, height=5, width=50, wrap=WORD)
hex_file_path.grid(row=2, column=0, columnspan=2, sticky=(N,E,W,S))
# Display the file value
file_path = os.path.join("Data", "Hex_save_location.txt")
if os.path.exists(file_path):
    with open(file_path, 'r') as file:
        Hex_file_value = file.read()
        hex_file_path.insert(END, Hex_file_value)
# Bind right-click context menu to hex_file_path
context_menu = Menu(hex_file_path, tearoff=0)
context_menu.add_command(label="Copy", command=copy_to_clipboard)
hex_file_path.bind("<Button-3>", lambda event: context_menu.post(event.x_root, event.y_root))
button_save = ttk.Button(Frame00, text="Save to File", command=save_to_file)
button_save.grid(row=5, column=0, sticky=(N,E,W,S))


complete_command_var = StringVar()
Complete_Command = ">> avrdude "+"-c "+programmer_var_display.get('1.0','end-1c') \
                   +" -p "+MCU_save_stored_value_display.get('1.0','end-1c')\
                   +" -U "+"flash:w:"+hex_file_path.get('1.0','end-1c')+":i"
complete_command_var.set(Complete_Command)
display_complete_command = ttk.Label(Frame00, wraplength=500, textvariable= complete_command_var)
display_complete_command.grid(row=4, column=0, sticky=(N,E,W,S))


def program_avr():
    display_avrdude_output.delete('1.0',END)
    prog_name = programmer_var_display.get('1.0','end-1c')
    part_name = MCU_save_stored_value_display.get('1.0','end-1c')
    hex_file_address = hex_file_path.get('1.0','end-1c')
    flash_statement = "flash:w:"+hex_file_address+":i"
    
    command = ['avrdude','-c',prog_name,'-p',part_name,'-U',flash_statement]
    print(command)
    
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    
    # Analyze the result and indicate success or failure
    output_lines = result.stderr.split('\n')
    success_indication = "flash verified\n\navrdude done.  Thank you."
    success_indication_1 = "flash verified\n\navrdude: safemode: Fuses OK\n\navrdude done.  Thank you."
    success = success_indication in result.stderr or success_indication_1 in result.stderr

    for line in output_lines:
        display_avrdude_output.insert(END, line + '\n')

    display_avrdude_output.see("end")

    if success:
        print("Programming successful!")
        try:
            canvas.itemconfig(circle, fill='#aaff00')
        except:
            pass
        
    else:
        print("Programming failed.")
        try:
            canvas.itemconfig(circle, fill='red')
        except:
            pass
        

sty.configure("Color.TButton", background="blue", foreground="white")
button0021 = ttk.Button(Frame00,style='Color.TButton', text="Program AVR", command=program_avr)
button0021.grid(row = 5, column  = 1)

display_avrdude_output = Text(Frame00, height=10, width=50, wrap=WORD)
display_avrdude_output.grid(row=6, columnspan=4, sticky=(N,E,W,S))
scrollbar = Scrollbar(Frame00, command=display_avrdude_output.yview)
scrollbar.grid(row=6, column=4, sticky=(N, S))
display_avrdude_output.config(yscrollcommand=scrollbar.set)

Abhay_text = "This program is writtent by : ABHAY KANT\nvisit: https://exasub.com"
AbhayLabel = ttk.Label(Frame00, text=Abhay_text)
AbhayLabel.grid(row = 11, column  = 0, sticky = (N,E,W,S))
## Frame00 END

def donothing():
   filewin = Toplevel(root)
   button = Button(filewin, text="Do nothing button")
   button.pack()

about_window = None  
def About_me():
    global about_window
    
    if about_window is None or not about_window.winfo_exists():
        about_window = Toplevel(root)
        about_window.title("About Me")
        
        label1 = ttk.Label(about_window, text="EXASUB.COM")
        label1.pack()
        
        button = ttk.Button(about_window, text="Quit", command=about_window.destroy)
        button.pack()
    else:
        about_window.lift()  

hfuse_read_value_display = None
lfuse_read_value_display = None
def Read_fuse():
    hfuse_read_value_display.delete('1.0',END)
    lfuse_read_value_display.delete('1.0',END)
    prog_name = programmer_var_display.get('1.0','end-1c')
    part_name = MCU_save_stored_value_display.get('1.0','end-1c')
    
    hfuse_statement = "hfuse:r:-:h"
    lfuse_statement = "lfuse:r:-:h"
    
    command = ['avrdude','-c',prog_name,'-p',part_name,'-U',hfuse_statement]
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    hfuse_read_value_display.insert(END,result.stdout)
    
    command = ['avrdude','-c',prog_name,'-p',part_name,'-U',lfuse_statement]
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    lfuse_read_value_display.insert(END,result.stdout)

    
fusesetwin = None
fusesetwin_write = None
def FuseSet():
    global fusesetwin,hfuse_read_value_display,lfuse_read_value_display
    if fusesetwin is None or not fusesetwin.winfo_exists():
            
        fusesetwin = Toplevel(root)
        
        button = Button(fusesetwin, text="Quit", command = fusesetwin.destroy)
        button.grid(column = 0, row = 0, sticky = (N,E,W,S))
        Read_Fuse_label = ttk.Label(fusesetwin, text="Read Fuse Values")
        Read_Fuse_label.grid( row = 1,column = 0, sticky = (N,E,W,S))

        read_fuse_button = Button(fusesetwin, text="Read", command = Read_fuse)
        read_fuse_button.grid(column = 1, row = 1, sticky = (N,E,W,S))

        hFuse_label = ttk.Label(fusesetwin, text="hFuse ")
        hFuse_label.grid( row = 2,column = 0, sticky = (N,E,W,S))
        hfuse_read_value_display = Text(fusesetwin, height = 1, width = 6, wrap=WORD)
        hfuse_read_value_display.grid(row = 2, column = 1, sticky = (N,E,W,S))

        lFuse_label = ttk.Label(fusesetwin, text="lFuse ")
        lFuse_label.grid( row = 3,column = 0, sticky = (N,E,W,S))
        lfuse_read_value_display = Text(fusesetwin, height = 1, width = 6, wrap=WORD)
        lfuse_read_value_display.grid(row = 3, column = 1, sticky = (N,E,W,S))
        # Separator object
        separator = ttk.Separator(fusesetwin, orient='horizontal')
        separator.grid(column = 0,columnspan=2, row = 4, sticky = (N,E,W,S))
            
        
        
        
        def fuseWrite():
            global fusesetwin_write
            if fusesetwin_write is None or not fusesetwin_write.winfo_exists():
                fusesetwin_write = Toplevel(root)
                label0101 = ttk.Label(fusesetwin_write, text = "Stored Default Fuse Settings")
                label0101.grid(column = 0, row = 0, sticky = (N,E,W,S))

                label0110 = ttk.Label(fusesetwin_write, text = "hFuse")
                label0110.grid( row = 1,column = 0, sticky = (N,E,W,S))
                hfuse_stored_value = StringVar()
                hfuse_stored_value_display = Text(fusesetwin_write, height = 1, width = 6, wrap=WORD)
                hfuse_stored_value_display.grid(row = 1, column = 1, sticky = (N,E,W,S))
                # Display the file value
                file_path = os.path.join("Data", "Fuse_hfuse.txt")
                if os.path.exists(file_path):
                    with open(file_path, 'r') as file:
                        fuse_value = file.read()
                        hfuse_stored_value_display.insert(END, fuse_value)


                label0120 = ttk.Label(fusesetwin_write, text = "lFuse")
                label0120.grid( row = 2,column = 0, sticky = (N,E,W,S))
                lfuse_stored_value = StringVar()
                lfuse_stored_value_display = Text(fusesetwin_write, height = 1, width = 6, wrap=WORD)
                lfuse_stored_value_display.grid(row = 2, column = 1, sticky = (N,E,W,S))
                # Display the file value
                file_path = os.path.join("Data", "Fuse_lfuse.txt")
                if os.path.exists(file_path):
                    with open(file_path, 'r') as file:
                        fuse_value = file.read()
                        lfuse_stored_value_display.insert(END, fuse_value)
                def flashfuse():
                    
                    prog_name = programmer_var_display.get('1.0','end-1c')
                    part_name = MCU_save_stored_value_display.get('1.0','end-1c')
                    
                    hfuse_statement = "hfuse:w:"+hfuse_stored_value_display.get('1.0', 'end-1c')+":m"
                    lfuse_statement = "lfuse:w:"+lfuse_stored_value_display.get('1.0', 'end-1c')+":m"
                    print(hfuse_statement)
                    print(lfuse_statement)
                    command = ['avrdude','-c',prog_name,'-p',part_name,'-U',hfuse_statement]
                    result = subprocess.run(command, shell=True, capture_output=True, text=True)
                    
                    output_lines = result.stderr.split('\n')
                    success_indication = "flash verified\n\navrdude done.  Thank you."
                    success = success_indication in result.stderr

                    for line in output_lines:
                        display_avrdude_output.insert(END, line + '\n')

                    display_avrdude_output.see("end")

                    if success:
                        print("Programming successful!")
                        try:
                            canvas.itemconfig(circle, fill='#aaff00')
                        except:
                            pass
                        
                    else:
                        print("Programming failed.")
                        try:
                            canvas.itemconfig(circle, fill='red')
                        except:
                            pass
                        
                    
                    command = ['avrdude','-c',prog_name,'-p',part_name,'-U',lfuse_statement]
                    result = subprocess.run(command, shell=True, capture_output=True, text=True)
                    
                    output_lines = result.stderr.split('\n')
                    success_indication = "flash verified\n\navrdude done.  Thank you."
                    success = success_indication in result.stderr

                    for line in output_lines:
                        display_avrdude_output.insert(END, line + '\n')

                    display_avrdude_output.see("end")

                    if success:
                        print("Programming successful!")
                        try:
                            canvas.itemconfig(circle, fill='#aaff00')
                        except:
                            pass
                        
                    else:
                        print("Programming failed.")
                        try:
                            canvas.itemconfig(circle, fill='red')
                        except:
                            pass
                        
                
                WriteFusebutton = Button(fusesetwin_write,text="Write Fuse", command = flashfuse)
                WriteFusebutton.grid(row = 0, column=1, sticky= (N,W,E,S))
                NoteLabelText = "Note: Change the  fuse setting carefully. \
                                \nPlease check the datasheet for correct fuse setting \
                                \nWrong Fuse setting may disable programming using programmer\
                                \nIf programmer is disabled you will need the offical ATMEL programmer"
                NoteLabel = ttk.Label(fusesetwin_write, wraplength = 500, text = NoteLabelText)
                NoteLabel.grid(column = 0, columnspan = 2, row = 10, sticky = (N,E,W,S))
            else:
                fusesetwin_write.lift()
        Write_Fuse_button = Button(fusesetwin, text="Write Fuse", command = fuseWrite)
        Write_Fuse_button.grid(column = 0,row = 5, sticky = (N,E,W,S))
    else:
         fusesetwin.lift()

quicksetwin = None
startx = 0
starty = 0

def move_window(event):
    global startx, starty
    x = quicksetwin.winfo_pointerx() - startx
    y = quicksetwin.winfo_pointery() - starty
    quicksetwin.geometry(f"+{x}+{y}")
    startx = quicksetwin.winfo_pointerx() - x
    starty = quicksetwin.winfo_pointery() - y


def Quick_set():
    global quicksetwin, startx, starty, circle,canvas
    if quicksetwin is None or not quicksetwin.winfo_exists():
        quicksetwin = Toplevel(root)
        quicksetwin.attributes("-topmost", True)  # Set the window to stay on top
        quicksetwin.overrideredirect(True)  # Remove window decorations
        
        title_bar = Frame(quicksetwin, bg="gray", relief="raised", bd=2)
        title_bar.grid(column=0, row=0, columnspan=3, sticky=(N, E, W))
        title_bar.bind("<ButtonPress-1>", start_move)
        title_bar.bind("<B1-Motion>", move_window)
        
        quickbutton = Button(title_bar, text="X", command=quicksetwin.destroy)
        quickbutton.grid(column=3, row=0, sticky=(N, E, W))
        
        quickProgbutton = Button(title_bar, text="Quick FLASH", command=program_avr)
        quickProgbutton.grid(column=1, row=0, sticky=(N, E, W), padx=5)
        global circle,canvas
        canvas = Canvas(title_bar, width=20, height=20)
        canvas.grid(column=2, row=0, sticky=(N, E, W), padx=5)

        # Draw a circle initially with a default color
        circle = canvas.create_oval(2, 2, 19, 19, fill='gray')
        
    else:
        quicksetwin.lift()

def start_move(event):
    global startx, starty
    startx = event.x
    starty = event.y



menubar = Menu(root)

Fusemenu = Menu(menubar, tearoff=1)
menubar.add_cascade(label="Fuse Setting", command=FuseSet)


menubar.add_command(label = "EXASUB.com", command = About_me)

menubar.add_command(label = "Quit", command = root.destroy)
menubar.add_command(label = "QuickSet", command = Quick_set)
root.config(menu=menubar)
if __name__ == "__main__":
    root.mainloop()
Posted on Leave a comment

Raspberry Pi Pico W as Bluetooth Low Energy Central Device and Peripheral Device

This is a very simple demonstration of the Unsecured Bluetooth Low Energy technology.

I am using two Raspberry Pi Pico W for this.
One will be operated in Central Role and the other will be in the Peripheral Role.

The peripheral device will advertise the temperature data of the rp2040 chip.

The Central device will scan the surrounding and connect to the peripheral device to receive the temperature data.
The central device does not use any passcode or pairing methods to connect to the peripheral device.

Note: You will need ble_advertising.py
You can check the code How to use Bluetooth LE of Raspberry Pi Pico W using MicroPython

ble_Central_device.py
# This example finds and connects to a peripheral running the
# UART service (e.g. ble_simple_peripheral.py).

import bluetooth
import random
import struct
import time
import micropython

from ble_advertising import decode_services, decode_name

from micropython import const

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")


class BLESimpleCentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)

        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None

        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._tx_handle = None
        self._rx_handle = None

    def _irq(self, event, data):
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(
                adv_data
            ):
                # Found a potential device, remember it and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(
                    addr
                )  # Note: addr buffer is owned by caller so need to copy it.
                self._name = decode_name(adv_data) or "?"
                self._ble.gap_scan(None)

        elif event == _IRQ_SCAN_DONE:
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type, self._addr, self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            # Connect successful.
            conn_handle, addr_type, addr = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _ = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            print("service", data)
            if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle

        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, self._start_handle, self._end_handle
                )
            else:
                print("Failed to find uart service.")

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
                self._rx_handle = value_handle
            if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
                self._tx_handle = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            # Characteristic query complete.
            if self._tx_handle is not None and self._rx_handle is not None:
                # We've finished connecting and discovering device, fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()
            else:
                print("Failed to find uart rx characteristic.")

        elif event == _IRQ_GATTC_WRITE_DONE:
            conn_handle, value_handle, status = data
            print("TX complete")

        elif event == _IRQ_GATTC_NOTIFY:
            conn_handle, value_handle, notify_data = data
            if conn_handle == self._conn_handle and value_handle == self._tx_handle:
                if self._notify_callback:
                    self._notify_callback(notify_data)

    # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        return (
            self._conn_handle is not None
            and self._tx_handle is not None
            and self._rx_handle is not None
        )

    # Find a device advertising the environmental sensor service.
    def scan(self, callback=None):
        self._addr_type = None
        self._addr = None
        self._scan_callback = callback
        self._ble.gap_scan(2000, 30000, 30000)

    # Connect to the specified device (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type or self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if self._conn_handle is None:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    # Send data over the UART
    def write(self, v, response=False):
        if not self.is_connected():
            return
        self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)

    # Set handler for when data is received over the UART.
    def on_notify(self, callback):
        self._notify_callback = callback


def demo():
    

    not_found = False

    def on_scan(addr_type, addr, name):
        if addr_type is not None:
            print("Found peripheral:", addr_type, addr, name)
            central.connect()
        else:
            nonlocal not_found
            not_found = True
            print("No peripheral found.")

    central.scan(callback=on_scan)

    # Wait for connection...
    while not central.is_connected():
        time.sleep_ms(100)
        if not_found:
            return

    print("Connected")

    def on_rx(v):
        buf1 = bytearray(v)
        print("RX ")
        for _ in v:
            print(chr(_), end='')
        print("")

    central.on_notify(on_rx)

    #with_response = False
    with_response = True

    i = 0
    while central.is_connected():
        try:
            v = str(i) + "_"
            print("TX", v)
            central.write(v, with_response)
        except:
            print("TX failed")
        i += 1
        time.sleep_ms(2000 if with_response else 30)

    print("Disconnected")


if __name__ == "__main__":
    ble = bluetooth.BLE()
    central = BLESimpleCentral(ble)
    while(1):
        demo()
ble_Peripheral_device.py
# This example demonstrates a UART periperhal.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const

def Temp_sensor():
    # Configure the internal temperature sensor
    sensor_temp = machine.ADC(machine.ADC.CORE_TEMP)
    conversion_factor = 3.3 / 65535
    raw_temp = sensor_temp.read_u16() * conversion_factor
    temperature = (27 - (raw_temp - 0.706) / 0.001721)
    return temperature

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)

_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_READ | _FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)


class BLESimplePeripheral:
    def __init__(self, ble, name="mpy-uart"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
        self._connections = set()
        self._write_callback = None
        self._payload = advertising_payload(name=name, services=[_UART_UUID])
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            print("New connection", conn_handle)
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()
        elif event == _IRQ_GATTS_WRITE:
            conn_handle, value_handle = data
            value = self._ble.gatts_read(value_handle)
            if value_handle == self._handle_rx and self._write_callback:
                self._write_callback(value)

    def send(self, data):
        for conn_handle in self._connections:
            self._ble.gatts_notify(conn_handle, self._handle_tx, data)

    def is_connected(self):
        return len(self._connections) > 0

    def _advertise(self, interval_us=500000):
        print("Starting advertising")
        self._ble.gap_advertise(interval_us, adv_data=self._payload)

    def on_write(self, callback):
        self._write_callback = callback


def demo():
    ble = bluetooth.BLE()
    p = BLESimplePeripheral(ble)

    def on_rx(v):
        print("RX", v)

    p.on_write(on_rx)

    i = 0
    while True:
        if p.is_connected():
            # Short burst of queued notifications.
            transmit_msg = "Temperature: "+str(Temp_sensor())
            p.send(str(transmit_msg))
            '''
            for _ in range(3):
                data = "oz"+ str(i) + "_"
                print("TX", data)
                p.send(data)
                i += 1
            '''
            i = i + 1
        time.sleep_ms(1000)


if __name__ == "__main__":
    demo()