import asyncio import json import os import time from bleak import BleakClient, BleakScanner # BLE Configurations DEVICE_NAME = "ESP32-C3-BLE" WRITE_UUID = "87654321-4321-4321-4321-cba987654321" NOTIFY_UUID = "98765432-1234-1234-1234-123456789abc" # BLE Transfer Settings CHUNK_SIZE = 512 # Logical chunk size BLE_PACKET_SIZE = 244 # Max safe packet size for BLE write # Global variables json_buffer = "" receiving = False end_marker = None file_list = [] def handle_notification(sender, data): global json_buffer, receiving, end_marker, file_list try: msg = data.decode('utf-8', errors='ignore').strip() print(f" 🔔 BLE Notify: {msg}") if msg.startswith("{") and msg.endswith("}"): try: parsed = json.loads(msg) if "mp3files" in parsed: file_list = parsed["mp3files"] elif "jpegfiles" in parsed: file_list = parsed["jpegfiles"] elif "files" in parsed: file_list = parsed["files"] for file in file_list: if file.get('size',0) == 0: print(f"⚠️ File '{file['filename']}' has zero size!") print(json.dumps(parsed, indent=2)) except Exception as e: print("❌ JSON Decode Error:", e) print("🛑 Raw JSON:", msg) elif end_marker and msg == end_marker: receiving = False print(" ✅ End marker received. Transfer complete.") elif msg.startswith("{") or receiving: receiving = True json_buffer += msg else: print(f"✅ Message: {msg}") except Exception as e: print("❌ Notification Error:", e) async def find_device(): print("🔍 Scanning for ESP32 BLE device...") devices = await BleakScanner.discover() for d in devices: if d.name and DEVICE_NAME in d.name: print(f"✅ Found {d.name} [{d.address}]") return d.address print("❌ ESP32 device not found.") return None async def send_command(client, cmd, expected_end_marker=None): global json_buffer, receiving, end_marker end_marker = expected_end_marker json_buffer = "" receiving = False await client.write_gatt_char(WRITE_UUID, (cmd + ' ').encode('utf-8'), response=True) for _ in range(200): await asyncio.sleep(0.05) if not receiving and json_buffer: break async def upload_jpeg_file(client): file_path = input("🖼️ Enter the full path of the JPEG file to upload: ").strip() if not os.path.isfile(file_path): print(f"❌ Invalid file path: {file_path}") return file_size = os.path.getsize(file_path) if file_size == 0: print(f"❌ The file '{file_path}' is empty and cannot be uploaded.") filename = os.path.basename(file_path) print(f"📁 Uploading: {filename} ({file_size} bytes)") # Correct start command for upload in SPIFFS (images folder) start_command = f"sending {filename} {file_size}" await client.write_gatt_char(WRITE_UUID, start_command.encode(), response=True) print(f"📡 Sent command: {start_command}") await asyncio.sleep(1) with open(file_path, "rb") as f: total_sent = 0 while True: chunk = f.read(CHUNK_SIZE) if not chunk: break # Split into BLE_PACKET_SIZE chunks and send properly for i in range(0, len(chunk), BLE_PACKET_SIZE): packet = chunk[i:i+BLE_PACKET_SIZE] await client.write_gatt_char(WRITE_UUID, packet, response=False) total_sent += len(packet) await asyncio.sleep(0) progress = (total_sent / file_size) * 100 print(f"📤 Sent {total_sent}/{file_size} bytes ({progress:.2f}%)") print("✅ JPEG file uploaded successfully to SPIFFS (images folder)!") async def upload_mp3_file(client): file_path = input("🖼️ Enter the full path of the MP3 file to upload: ").strip() if not os.path.isfile(file_path): print(f"❌ Invalid file path: {file_path}") return file_size = os.path.getsize(file_path) filename = os.path.basename(file_path) print(f"📁 Uploading: {filename} ({file_size} bytes)") # Correct start command for upload in SPIFFS (images folder) start_command = f"sendingaudio {filename} {file_size}" await client.write_gatt_char(WRITE_UUID, start_command.encode(), response=True) print(f"📡 Sent command: {start_command}") await asyncio.sleep(1) with open(file_path, "rb") as f: total_sent = 0 while True: chunk = f.read(CHUNK_SIZE) if not chunk: break # Split into BLE_PACKET_SIZE chunks and send properly for i in range(0, len(chunk), BLE_PACKET_SIZE): packet = chunk[i:i+BLE_PACKET_SIZE] await client.write_gatt_char(WRITE_UUID, packet, response=False) total_sent += len(packet) await asyncio.sleep(0) progress = (total_sent / file_size) * 100 print(f"📤 Sent {total_sent}/{file_size} bytes ({progress:.2f}%)") print("✅ MP3 file uploaded successfully to SPIFFS (mp3files folder)!") async def upload_mp3_ram(client): file_path = input("🎵 Enter the full path of the MP3 file to upload: ").strip() if not os.path.isfile(file_path): print(f"❌ Invalid file path: {file_path}") return file_size = os.path.getsize(file_path) filename = os.path.basename(file_path) print(f"📁 Uploading: {filename} ({file_size} bytes)") start_command = f"ssa {filename} {file_size}" await client.write_gatt_char(WRITE_UUID, start_command.encode(), response=True) print(f"📡 Sent command: {start_command}") await asyncio.sleep(1) total_sent = 0 with open(file_path, "rb") as f: while total_sent < file_size: chunk = f.read(CHUNK_SIZE) if not chunk: break await client.write_gatt_char(WRITE_UUID, chunk, response=False) total_sent += len(chunk) print(f"📤 Sent {total_sent}/{file_size} bytes") print("✅ MP3 File sent in RAM mode!") async def upload_jpeg_ram(client): file_path = input("🎵 Enter the full path of the JPEG file to upload: ").strip() if not os.path.isfile(file_path): print(f"❌ Invalid file path: {file_path}") return file_size = os.path.getsize(file_path) filename = os.path.basename(file_path) print(f"📁 Uploading: {filename} ({file_size} bytes)") start_command = f"ssf {filename} {file_size}" await client.write_gatt_char(WRITE_UUID, start_command.encode(), response=True) print(f"📡 Sent command: {start_command}") await asyncio.sleep(1) total_sent = 0 with open(file_path, "rb") as f: while total_sent < file_size: chunk = f.read(CHUNK_SIZE) if not chunk: break await client.write_gatt_char(WRITE_UUID, chunk, response=False) total_sent += len(chunk) print(f"📤 Sent {total_sent}/{file_size} bytes") print("✅ MP3 File sent in RAM mode!") async def ble_console(): address = await find_device() if not address: return async with BleakClient(address) as client: print("🔗 Connected to ESP32 BLE") await client.start_notify(NOTIFY_UUID, handle_notification) await asyncio.sleep(1) COMMANDS = { 1: {"description": "Play MP3 file", "type": "play_mp3"}, 2: {"description": "Delete MP3 file", "type": "delete_mp3"}, 3: {"description": "Delete JPEG file", "type": "delete_jpeg"}, 4: {"description": "Upload JPEG file", "type": "upload_jpeg"}, 5: {"description": "Upload MP3 file", "type": "upload_mp3"}, 6: {"description": "Set Volume (1-21)", "command_prefix": "setvolume**"}, 7: {"description": "Get Volume", "command": "getvolume"}, 8: {"description": "Start Rotation", "command": "startrotation"}, 9: {"description": "Stop Rotation", "command": "stoprotation"}, 10: {"description": "Get information about JPEG files", "command": "fileinfo", "end_marker": "end_of_fileinfo"}, 11: {"description": "Get information about MP3 files", "command": "fileinfomp3", "end_marker": "end_of_fileinfomp3"}, 12: {"description": "Manual Volume Change", "type": "manual_volume"}, 13: {"description": "Check Free Size", "command": "freesize"}, 14: {"description": "Set Timer", "command_prefix": "settimer**"}, 15: {"description": "Upload MP3 (RAM Mode)", "type": "upload_mp3_ram"}, # 🔥 New 16: {"description": "Upload JPEG (RAM Mode)", "type": "upload_jpeg_ram"} # 🔥 New } print(" 📋 Available Commands:") for num in sorted(COMMANDS.keys()): print(f" {num}. {COMMANDS[num]['description']}") while True: user_input = input(" >>> Select an option (or type 'disconnect' to exit): ").strip() if user_input.lower() == "disconnect": print("🔌 Disconnecting...") break if not user_input.isdigit(): print("❗ Please enter a valid number.") continue choice = int(user_input) if choice not in COMMANDS: print("❗ Invalid choice. Try again.") continue action = COMMANDS[choice] if action.get("type") == "play_mp3": await send_command(client, "fileinfomp3", expected_end_marker="end_of_fileinfomp3") await asyncio.sleep(1) if not file_list: print("❌ No MP3 files found!") continue print(" 🎵 Available MP3 Files:") for idx, file in enumerate(file_list, 1): print(f" {idx}. {file['filename']} ({file['size']} bytes)") file_idx = int(input(" 🎯 Enter the number of MP3 to play: ")) - 1 selected_file = file_list[file_idx]['filename'] await send_command(client, f"play**{selected_file}") print(f"▶️ Playing {selected_file}") elif action.get("type") == "delete_mp3": await send_command(client, "fileinfomp3", expected_end_marker="end_of_fileinfomp3") await asyncio.sleep(1) if not file_list: print("❌ No MP3 files found!") continue print(" 🗑️ Available MP3 Files:") for idx, file in enumerate(file_list, 1): print(f" {idx}. {file['filename']} ({file['size']} bytes)") file_idx = int(input(" 🎯 Enter the number of MP3 to delete: ")) - 1 selected_file = file_list[file_idx]['filename'] await send_command(client, f"delete**mp3**{selected_file}") print(f"🗑️ Deleted {selected_file}") elif action.get("type") == "delete_jpeg": await send_command(client, "fileinfo", expected_end_marker="end_of_fileinfo") await asyncio.sleep(1) if not file_list: print("❌ No JPEG files found!") continue print(" 🖼️ Available JPEG Files:") for idx, file in enumerate(file_list, 1): print(f" {idx}. {file['filename']} ({file['size']} bytes)") file_idx = int(input(" 🎯 Enter the number of JPEG to delete: ")) - 1 selected_file = file_list[file_idx]['filename'] await send_command(client, f"delete**images**{selected_file}") print(f"🗑️ Deleted {selected_file}") elif action.get("type") == "upload_jpeg": await upload_jpeg_file(client) elif action.get("type") == "upload_mp3": await upload_mp3_file(client) elif action.get("type") == "manual_volume": mv_input = input("Enter '+' to increase or '-' to decrease volume: ").strip() if mv_input not in ('+', '-'): print("❗ Invalid input.") continue await send_command(client, mv_input) elif action.get("type") == "upload_mp3_ram": await upload_mp3_ram(client) elif action.get("type") == "upload_jpeg_ram": await upload_jpeg_ram(client) elif "command_prefix" in action: value = input(f"Enter value for {action['description']}: ").strip() await send_command(client, f"{action['command_prefix']}{value}") else: await send_command(client, action['command']) await client.stop_notify(NOTIFY_UUID) print("✅ Disconnected") if __name__ == "__main__": asyncio.run(ble_console())