1. Home
  2. Docs
  3. J10 Document
  4. BLE protocol and client c...
  5. python code to read/write J10 via ble

python code to read/write J10 via ble

usage : python j10_cmd.py --name=<ble name>

-- coding: utf-8 --

"""
这里是注释,README
这是通过命令行批量更新BLE参数,每次切换到MODE,然后写,再读出来,注意某些参数需要重启才生效,例如BLE名称
在调试阶段,如果觉得这个过程比较长,可以把sleep时间改小一点,当前为10s ,或者只操作某一个MODE
运行环境: windows ,python ,安装了bleak,这是ble 包 ,在你的电脑上,可能还有别的Python 库需要安装
pip install bleak

"""

import argparse
import asyncio
import logging

from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic

logger = logging.getLogger(name)

设备的Characteristic UUID(具备写属性Write)

par_write_characteristic="e093f3b5-00a3-a9e5-9eca-40036e0edc24"
par_read_characteristic ="e093f3b5-00a3-a9e5-9eca-40036e0edc24"
par_notification_characteristic="e093f3b5-00a3-a9e5-9eca-40026e0edc24"

准备发送的消息,这是切换MODE

send_str=bytearray([0xAA,0x00,0x0,0x01])

changemodeA_str = "AA 00 00 00"
changemodeB_str = "AA 00 00 01"
changemodeC_str = "AA 00 00 02"
changemodeD_str = "AA 00 00 03"

modeA_str = "\
00 0F 00 1C CD 0A 00 32 00 03 4A 31 30 2D 32 30 32 33 2D 00 02 03 03 03 03 03 03 03 37 32 30 30 \
30 30 30 30 12 17 19 1A 1A 15 12 15 46 46 46 46 46 46 46 46 0A 0A 0A 0A 0A 0A 0A 0A 67 69 6E 6E \
6E 69 67 6C 00 00 FF FF 2E E4 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 32 00 00 00 \
00 00 00 00"

modeB_str ="\
01 0F 00 1C CD 0A 00 32 00 03 4A 31 30 2D 32 30 32 33 2D 00 03 03 03 03 03 03 03 03 37 32 30 30 \
30 30 30 30 14 17 19 1A 1A 19 13 14 4B 4B 4B 4B 4B 4B 4B 4B 14 14 14 14 14 14 14 14 67 69 6E 6E \
6E 6E 6C 69 00 00 FF FF 2E E4 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 32 00 00 00 \
00 00 00 00"
modeC_str="\
02 0F 00 1C C2 0A 00 32 00 03 4A 31 30 2D 32 30 32 33 2D 00 02 03 03 03 03 03 03 03 34 32 30 30 \
30 30 30 30 14 17 19 19 1A 17 12 0F 46 46 46 46 46 46 46 46 0A 0A 0A 0A 0A 0A 0A 0A 6E 6E 6E 6E \
6E 6E 6E 6E 00 00 00 00 2E DA 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 3F FF 32 00 00 00 \
00 00 00 00"

modeD_str="\
03 0F 00 1C CD 0A 00 32 00 03 4A 31 30 2D 32 30 32 33 2D 00 03 03 03 03 03 03 03 03 37 35 32 30 \
30 30 30 30 0F 12 17 18 1A 18 15 15 4B 4B 4B 4B 4B 4B 4B 4B 14 0F 0F 0F 0F 0F 0F 0F 67 69 6C 6E \
6E 6E 6C 69 00 00 FF FF 2E EC 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 32 00 00 00 \
00 00 00 00"
def str_to_array(str1):

tmp_arr = str1.split()
ilen = len(tmp_arr) 

i=0
while (i <ilen): 
    tmp_arr[i] = int(tmp_arr[i],16) 
    i=i+1
return bytearray(tmp_arr)    

def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
"""Simple notification handler which prints the data received."""
#logger.info("%s: %r", characteristic.description, data.count())

logger.info("notify rcv:%s: %r", characteristic.description, data)

async def main(args: argparse.Namespace):

#ble_buff =str_to_array(modeA_str)

#logger.info("ble buf:%r",ble_buff)
logger.info("starting scan...")

if args.address:
    device = await BleakScanner.find_device_by_address(
        args.address, cb=dict(use_bdaddr=args.macos_use_bdaddr)
    )
    if device is None:
        logger.error("could not find device with address '%s'", args.address)
        return
else:
    device = await BleakScanner.find_device_by_name(
        args.name, cb=dict(use_bdaddr=args.macos_use_bdaddr)
    )
    if device is None:
        logger.error("could not find device with name '%s'", args.name)
        return

 #事件定义
disconnected_event = asyncio.Event()

#断开连接事件回调
def disconnected_callback(client):
    print("Disconnected callback called!")
    disconnected_event.set()


logger.info("connecting to device...")

async with BleakClient(device,disconnected_callback=disconnected_callback) as client:
    logger.info("Connected")

    #await client.start_notify(par_notification_characteristic, notification_handler)

    # 切换MODE A ,读参数 ,写参数

    logger.info("%s", "Change Mode A:")
    send_str = str_to_array(changemodeA_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)

    await asyncio.sleep(10.0)
    ble_buff = await client.read_gatt_char(par_read_characteristic)

    await asyncio.sleep(10.0)


    # 下面这句是测试修改音量,工作正常
    #modeA_str='AA 01 00 03'
    send_str = str_to_array(modeA_str)
    logger.info("Write Mode A:%r",send_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)
    await asyncio.sleep(10.0)

    ble_buff = await client.read_gatt_char(par_read_characteristic)
    logger.info("%s: %r", "Read Mode A:", ble_buff)
    await asyncio.sleep(10.0)


    # 切换MODE B ,读参数 ,写参数
    ''' 
    logger.info("%s", "Change Mode B:")
    send_str = str_to_array(changemodeB_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)

    await asyncio.sleep(10.0)
    ble_buff = await client.read_gatt_char(par_read_characteristic)
    logger.info("%s: %r", "Read Mode B:", ble_buff)
    await asyncio.sleep(10.0)

    logger.info("%s", "Write Mode B:")
    send_str = str_to_array(modeB_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)
    await asyncio.sleep(10.0)

    # 切换MODE C ,读参数 ,写参数

    logger.info("%s", "Change Mode C:")
    send_str = str_to_array(changemodeC_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)

    await asyncio.sleep(10.0)
    ble_buff = await client.read_gatt_char(par_read_characteristic)
    logger.info("%s: %r", "Read Mode C:", ble_buff)
    await asyncio.sleep(10.0)

    logger.info("%s", "Write Mode C:")
    send_str = str_to_array(modeC_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)


    # 切换MODE D ,读参数 ,写参数

    logger.info("%s", "Change Mode D:")
    send_str = str_to_array(changemodeD_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)

    await asyncio.sleep(10.0)
    ble_buff = await client.read_gatt_char(par_read_characteristic)
    logger.info("%s: %r", "Read Mode D:", ble_buff)
    await asyncio.sleep(10.0)

    logger.info("%s", "Write Mode D:")
    send_str = str_to_array(modeD_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)
    '''

    #最后我们切换回A ,因为出厂为modeA
    logger.info("%s", "Change Mode A:")
    send_str = str_to_array(changemodeA_str)        
    await client.write_gatt_char(par_write_characteristic, send_str)

if name == "main":
parser = argparse.ArgumentParser()

device_group = parser.add_mutually_exclusive_group(required=True)

device_group.add_argument(
    "--name",
    metavar="<name>",
    help="the name of the bluetooth device to connect to",
)
device_group.add_argument(
    "--address",
    metavar="<address>",
    help="the address of the bluetooth device to connect to",
)

parser.add_argument(
    "--macos-use-bdaddr",
    action="store_true",
    help="when true use Bluetooth address instead of UUID on macOS",
)


parser.add_argument(
    "-d",
    "--debug",
    action="store_true",
    help="sets the log level to debug",
)

args = parser.parse_args()

log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(
    level=log_level,
    format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)

asyncio.run(main(args))

How can we help?