Commit 1e9d5afd authored by martin hou's avatar martin hou

fix: 增加后台任务实现与模拟

parent f5d1ac58
This diff is collapsed.
......@@ -77,15 +77,15 @@ export function DeviceManager() {
}
}
const doDownloadFile = async (fileName: string) => {
const doDownloadFile = async (file: Jensen.FileInfo) => {
if (sn == null) return alert('Please connect first');
if (userTask == null) return alert('Please request a user task first');
// 需要有一个发起后台任务的方法
try {
const jensen = mgr.getInstance(sn, userTask);
if (jensen == null) return alert('Please connect first');
await jensen.getFile(fileName, 1024, (data) => {
console.log(data);
// ...
mgr.submitFileTask(sn, {
fileName: file.name,
fileLength: file.length,
fileSignature: file.signature,
task: 'download'
});
} catch (err: any) {
alert('Error: ' + err.message);
......@@ -113,7 +113,7 @@ export function DeviceManager() {
<ol>
{
files.map((file, index) => {
return <li key={index}>{file.name} - {file.length} - {file.duration} <a href="#" onClick={() => { doDownloadFile(file.name) }}>Download</a></li>
return <li key={index}>{file.name} - {file.length} - {file.duration} <a href="javascript:void(0)" onClick={() => { doDownloadFile(file) }}>Download</a></li>
})
}
</ol>
......
import Jensen, { DeviceInfo, ScheduleInfo } from "../../";
import { Logger } from "../Logger";
import { BackgroundTask, DeviceStateSchema, DeviceStateStore, InitializeTask, StateChangeListener, TaskManager, TimerTask } from "./tasks";
import { buildTasks, FileTaskData } from "./task-impl";
// 指令参数
export type CommandOptions = {
......@@ -110,6 +111,10 @@ export class ConnectionManager
return tm.verifyUserTask(tag);
}
submitFileTask (sn: string, data: FileTaskData) {
return this.taskManagers.get(sn)?.getStateStore()?.pushTaskData('file-transfer', data);
}
// 获取当前用户任务的 tag
getCurrentUserTaskTag (sn: string): string | null {
return this.taskManagers.get(sn)?.getCurrentUserTaskTag() || null;
......@@ -273,7 +278,7 @@ export class ConnectionManager
let tm = this.taskManagers.get(dinfo.sn);
if (tm == null) {
tm = new TaskManager(inst, this.logger);
this.taskBuilder(inst, tm);
buildTasks(inst, tm);
await tm.scheduleInitializeTasks();
this.taskManagers.set(dinfo.sn, tm);
}
......@@ -294,136 +299,6 @@ export class ConnectionManager
}
}
taskBuilder(jensen: Jensen, tm: TaskManager)
{
// 初始化任务的注册 - 数据会被存储到 DeviceStateStore 中
tm.registerInitializeTask({
tag: 'time-sync',
task: async (jensen: Jensen | null, store) => {
let time = await jensen?.getTime(1);
Logger.info('jensen', 'initialize', 'get-time: ' + JSON.stringify(time));
// 存储设备时间到状态容器
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
// 同步设置时间
await jensen?.setTime(new Date(), 1);
Logger.info('jensen', 'initialize', 'set-time-to: ' + new Date().toLocaleString());
}
});
tm.registerInitializeTask({
tag: 'get-settings',
task: async (jensen: Jensen | null, store) => {
let settings = await jensen?.getSettings(1);
Logger.info('jensen', 'initialize', 'get-settings: ' + JSON.stringify(settings));
// 存储设置到状态容器
if (settings) {
store.set('settings', settings);
}
}
});
tm.registerInitializeTask({
tag: 'get-card-info',
task: async (jensen: Jensen | null, store) => {
let cardInfo = await jensen?.getCardInfo(1);
Logger.info('jensen', 'initialize', 'get-card-info: ' + JSON.stringify(cardInfo));
// 存储卡信息到状态容器
if (cardInfo) {
store.set('cardInfo', cardInfo);
}
}
});
tm.registerInitializeTask({
tag: 'battery-status',
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'initialize', 'get-battery-status: ' + JSON.stringify(batteryStatus));
// 存储电池状态到状态容器
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// 定时同步设备电池状态
tm.registerTimerTask({
tag: 'battery-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'timer-task', 'battery: ' + JSON.stringify(batteryStatus));
// 更新电池状态(会自动触发订阅者)
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// 定时获取设备时间
tm.registerTimerTask({
tag: 'get-device-time',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let time = await jensen?.getTime(1);
Logger.info('jensen', 'timer-task', 'time: ' + JSON.stringify(time));
// 更新设备时间
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
}
});
// 录音中的状态同步
tm.registerTimerTask({
tag: 'recording-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let recording = await jensen?.getRecordingFile();
Logger.info('jensen', 'timer-task', 'recording-status: ' + JSON.stringify(recording));
if (recording) store.set('recording', recording);
}
});
// 设备录音文件列表同步
tm.registerTimerTask({
tag: 'file-list-sync',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let needRefresh = false;
// 1. 定时查询文件数量
let fileCount = await jensen?.getFileCount(1);
if (fileCount)
{
let lastCount = store.get('file-count');
if (lastCount != fileCount.count)
{
needRefresh = true;
Logger.info('jensen', 'timer-task', 'file-count changed: ' + lastCount + ' -> ' + fileCount.count);
}
store.set('file-count', fileCount.count);
}
if (!needRefresh) return;
let files = await jensen?.listFiles();
if (files) store.set('files', files);
}
});
// 后台任务的注册(如有需要可以在这里添加)
}
onautoconnect (eventListener: AutoConnectEventHandler) {
this.onautoconnectEventListener = eventListener;
}
......
import Jensen, { FileInfo } from "../../jensen";
import { Logger } from "../Logger";
import { TaskManager, DeviceStateStore } from "./tasks";
export type FileTaskData = {
fileName: string;
fileLength: number;
fileSignature: string;
task: 'download' | 'transcribe' | 'summarize';
}
/**
* 任务构建器 - 注册设备的所有后台任务
* @param jensen Jensen 实例
* @param tm TaskManager 实例
*/
export function buildTasks(jensen: Jensen, tm: TaskManager) {
// ===================== 初始化任务 =====================
// 时间同步
tm.registerInitializeTask({
tag: 'time-sync',
task: async (jensen: Jensen | null, store) => {
let time = await jensen?.getTime(1);
Logger.info('jensen', 'initialize', 'get-time: ' + JSON.stringify(time));
// 存储设备时间到状态容器
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
// 同步设置时间
await jensen?.setTime(new Date(), 1);
Logger.info('jensen', 'initialize', 'set-time-to: ' + new Date().toLocaleString());
}
});
// 获取设备设置
tm.registerInitializeTask({
tag: 'get-settings',
task: async (jensen: Jensen | null, store) => {
let settings = await jensen?.getSettings(1);
Logger.info('jensen', 'initialize', 'get-settings: ' + JSON.stringify(settings));
// 存储设置到状态容器
if (settings) {
store.set('settings', settings);
}
}
});
// 获取存储卡信息
tm.registerInitializeTask({
tag: 'get-card-info',
task: async (jensen: Jensen | null, store) => {
let cardInfo = await jensen?.getCardInfo(1);
Logger.info('jensen', 'initialize', 'get-card-info: ' + JSON.stringify(cardInfo));
// 存储卡信息到状态容器
if (cardInfo) {
store.set('cardInfo', cardInfo);
}
}
});
// 获取电池状态(仅 P1 型号)
tm.registerInitializeTask({
tag: 'battery-status',
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'initialize', 'get-battery-status: ' + JSON.stringify(batteryStatus));
// 存储电池状态到状态容器
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// ===================== 定时任务 =====================
// 定时同步设备电池状态
tm.registerTimerTask({
tag: 'battery-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'timer-task', 'battery: ' + JSON.stringify(batteryStatus));
// 更新电池状态(会自动触发订阅者)
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// 定时获取设备时间
tm.registerTimerTask({
tag: 'get-device-time',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let time = await jensen?.getTime(1);
Logger.info('jensen', 'timer-task', 'time: ' + JSON.stringify(time));
// 更新设备时间
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
}
});
// 录音中的状态同步
tm.registerTimerTask({
tag: 'recording-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let recording = await jensen?.getRecordingFile();
Logger.info('jensen', 'timer-task', 'recording-status: ' + JSON.stringify(recording));
if (recording) store.set('recording', recording);
}
});
// 设备录音文件列表同步
tm.registerTimerTask({
tag: 'file-list-sync',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let needRefresh = false;
// 1. 定时查询文件数量
let fileCount = await jensen?.getFileCount(1);
if (fileCount)
{
let lastCount = store.get('file-count');
if (lastCount != fileCount.count)
{
needRefresh = true;
Logger.info('jensen', 'timer-task', 'file-count changed: ' + lastCount + ' -> ' + fileCount.count);
}
store.set('file-count', fileCount.count);
}
if (!needRefresh) return;
let files = await jensen?.listFiles();
if (files) store.set('files', files);
}
});
// ===================== 后台任务 =====================
// 如有需要可以在这里添加后台任务
tm.registerBackgroundTask({
tag: 'file-transfer',
// 注意:必须使用 function 而非箭头函数,才能正确绑定 this
schedule: async function (jensen, store) {
let data = store.peekTaskData('file-transfer') as FileTaskData | undefined;
if (!data) return;
console.log('xxxx', data);
// 通过 this 调用自定义方法
await this.transfer(jensen, data, store);
store.popTaskData('file-transfer');
// this.postProcess(data);
if (data.task == 'download')
{
this.download();
}
else if (data.task == 'transcribe')
{
this.transcribe();
}
else if (data.task == 'summarize')
{
this.summarize();
}
},
// 自定义方法:文件传输
transfer: async function (jensen: Jensen | null, data: FileTaskData, store: DeviceStateStore) {
if (!jensen) return false;
// 用 Promise 包装回调逻辑,等待所有数据块接收完成
const fileData = await new Promise<Uint8Array | null>((resolve) => {
const chunks: Uint8Array[] = [];
let receivedLength = 0;
jensen.getFile(data.fileName, data.fileLength, (chunk) => {
if (chunk === 'fail') {
console.error('文件传输失败');
resolve(null);
return;
}
// 收集数据块
chunks.push(chunk);
receivedLength += chunk.length;
console.log(`接收进度: ${receivedLength} / ${data.fileLength}`);
// 检查是否接收完成
if (receivedLength >= data.fileLength) {
// 合并所有数据块为一个完整的 Uint8Array
const fileData = new Uint8Array(receivedLength);
let offset = 0;
for (const chunk of chunks) {
fileData.set(chunk, offset);
offset += chunk.length;
}
console.log('文件传输完成,总大小:', receivedLength);
resolve(fileData);
}
});
});
if (fileData) {
// 将完整的文件数据存储到 store 中供后续处理使用
// store.set('transferredFile', { fileName: data.fileName, data: fileData });
return true;
}
return false;
},
download: async function () {
console.log('xxxx', 'download and save to local file');
},
transcribe: async function () {
console.log('xxxx', 'transcribe the file');
},
summarize: async function () {
console.log('xxxx', 'summarize the file');
}
});
}
......@@ -28,6 +28,8 @@ export class DeviceStateStore {
private updateTimes: Map<string, number> = new Map();
// 状态变更监听器
private listeners: Map<string, StateChangeListener<any>[]> = new Map();
// 后台任务队列:key 为任务 tag,value 为待处理数据数组
private taskQueues: Map<string, any[]> = new Map();
// 获取状态
get<K extends keyof DeviceStateSchema>(key: K): DeviceStateSchema[K] | undefined {
......@@ -95,10 +97,129 @@ export class DeviceStateStore {
}
}
// ===================== 后台任务队列操作方法 =====================
/**
* 向指定任务队列推入数据
* @param tag 任务标签(与后台任务的 tag 一致)
* @param data 要推入的数据
*/
pushTaskData(tag: string, data: any): void {
if (!this.taskQueues.has(tag)) {
this.taskQueues.set(tag, []);
}
this.taskQueues.get(tag)!.push(data);
}
/**
* 向指定任务队列批量推入数据
* @param tag 任务标签
* @param dataList 要推入的数据数组
*/
pushTaskDataBatch(tag: string, dataList: any[]): void {
if (!this.taskQueues.has(tag)) {
this.taskQueues.set(tag, []);
}
this.taskQueues.get(tag)!.push(...dataList);
}
/**
* 从指定任务队列弹出一个数据(FIFO,先进先出)
* @param tag 任务标签
* @returns 弹出的数据,如果队列为空则返回 undefined
*/
popTaskData(tag: string): any | undefined {
const queue = this.taskQueues.get(tag);
if (!queue || queue.length === 0) return undefined;
return queue.shift();
}
/**
* 从指定任务队列弹出所有数据
* @param tag 任务标签
* @returns 所有数据的数组,如果队列为空则返回空数组
*/
popAllTaskData(tag: string): any[] {
const queue = this.taskQueues.get(tag);
if (!queue || queue.length === 0) return [];
const result = [...queue];
queue.length = 0; // 清空队列
return result;
}
/**
* 查看指定任务队列的第一个数据(不弹出)
* @param tag 任务标签
* @returns 队列中的第一个数据,如果队列为空则返回 undefined
*/
peekTaskData(tag: string): any | undefined {
const queue = this.taskQueues.get(tag);
if (!queue || queue.length === 0) return undefined;
return queue[0];
}
/**
* 查看指定任务队列的所有数据(不弹出)
* @param tag 任务标签
* @returns 队列中所有数据的副本,如果队列为空则返回空数组
*/
peekAllTaskData(tag: string): any[] {
const queue = this.taskQueues.get(tag);
if (!queue) return [];
return [...queue];
}
/**
* 获取指定任务队列中的数据数量
* @param tag 任务标签
* @returns 队列中的数据数量
*/
getTaskDataCount(tag: string): number {
const queue = this.taskQueues.get(tag);
return queue ? queue.length : 0;
}
/**
* 检查指定任务队列是否有数据
* @param tag 任务标签
* @returns 如果队列有数据返回 true,否则返回 false
*/
hasTaskData(tag: string): boolean {
const queue = this.taskQueues.get(tag);
return queue !== undefined && queue.length > 0;
}
/**
* 清空指定任务队列
* @param tag 任务标签
*/
clearTaskData(tag: string): void {
const queue = this.taskQueues.get(tag);
if (queue) queue.length = 0;
}
/**
* 清空所有任务队列
*/
clearAllTaskData(): void {
this.taskQueues.clear();
}
/**
* 获取所有任务队列的 tag 列表
* @returns 所有任务队列的 tag 数组
*/
getTaskQueueTags(): string[] {
return Array.from(this.taskQueues.keys());
}
// ===================== 状态管理方法 =====================
// 清空所有状态
clear(): void {
this.state = {};
this.updateTimes.clear();
this.taskQueues.clear(); // 同时清空任务队列
// 不清除监听器,允许重连后继续监听
}
......@@ -108,12 +229,19 @@ export class DeviceStateStore {
}
// 导出状态快照(用于调试)
dump(): { state: Partial<DeviceStateSchema>; updateTimes: Record<string, number> } {
dump(): {
state: Partial<DeviceStateSchema>;
updateTimes: Record<string, number>;
taskQueues: Record<string, any[]>;
} {
const times: Record<string, number> = {};
this.updateTimes.forEach((v, k) => times[k] = v);
const queues: Record<string, any[]> = {};
this.taskQueues.forEach((v, k) => queues[k] = [...v]);
return {
state: { ...this.state },
updateTimes: times
updateTimes: times,
taskQueues: queues
};
}
}
......@@ -135,14 +263,11 @@ export type TimerTask = {
export type BackgroundTask = {
tag: string;
// 用于任务调度
schedule: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
// 用于设备交互使用
deviceTask?: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
// 任务善后处理调用
generalTask?: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
lastExecuteTime: number;
interval: number; // 轮询间隔
// 用于任务调度,可通过 this 调用同级定义的其它方法(注意:必须使用 function 而非箭头函数)
schedule: (this: BackgroundTask, jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
lastExecuteTime?: number;
// 允许定义任意其它方法/属性
[key: string]: any;
}
export type UserTask = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment