Commit 6c2f1d50 authored by martin hou's avatar martin hou

refact: 设备指令管理重构

parent 239aac54
......@@ -236,7 +236,7 @@ declare class Jensen {
setNotification: (state: boolean, time?: number) => Promise<ReturnStruct['common']>;
// 获取录音中的文件信息,当它从null -> non-null -> null转变时就表示已经有一个新的录音产生了
getRecordingFile: () => Promise<{ recording: null | string; createTime: string; createDate: string }>;
getRecordingFile: (seconds?: number) => Promise<{ recording: null | string; createTime: string; createDate: string }>;
// 获取内部存储卡信息
// used:已使用,capacity:总容量(单位为M),status:900表示正常,其它表示异常
......
import { createRoot } from 'react-dom/client';
import { Home } from '.';
import { DeviceManager } from './mgr';
createRoot(document.getElementById('root') as HTMLDivElement).render(<Home />);
createRoot(document.getElementById('root') as HTMLDivElement).render(<DeviceManager />);
button
{
height: 50px;
font-size: 18px;
padding: 0px 30px;
border-radius: 5px;
outline: none;
color: #fff;
background-color: #090;
border: 0px;
}
button:hover {
cursor: pointer;
}
.btn-container {
display: flex;
flex-direction: row;
gap: 16px;
padding: 16px;
align-items: center;
flex-wrap: wrap;
}
.info-container {
display: flex;
flex-direction: row;
}
.left-side {
width: 60%;
border: 1px solid #ccc;
padding: 16px;
}
.right-side {
width: 40%;
border: 1px solid #ccc;
padding: 16px;
}
import { mgr } from './utils/mgr';
import './mgr.css';
import { useEffect, useState } from 'react';
import { Logger } from './Logger';
import Jensen from '../jensen';
export function DeviceManager() {
const [sn, setSn] = useState<string|null>(null);
const [time, setTime] = useState<string|null>(null);
const [battery, setBattery] = useState<string|null>(null);
const [userTask, setUserTask] = useState<string|null>(null);
const [files, setFiles] = useState<Jensen.FileInfo[]>([]);
const [recording, setRecording] = useState<Jensen.FileInfo|null>(null);
useEffect(() => {
mgr.onautoconnect((dinfo) => {
setSn(dinfo.sn);
// 注册一个状态监视器,当设备状态里的时间发生改变时,更新时间
// 回调参数: (key, newValue, oldValue)
mgr.subscribeDeviceState(dinfo.sn, 'time', (key, newValue, oldValue) => {
// console.error('时间更新:', newValue);
setTime(JSON.stringify(newValue));
});
mgr.subscribeDeviceState(dinfo.sn, 'battery-status', (key, newValue, oldValue) => {
setBattery(JSON.stringify(newValue));
});
mgr.subscribeDeviceState(dinfo.sn, 'files', (key, newValue, oldValue) => {
setFiles(newValue as Jensen.FileInfo[]);
});
mgr.subscribeDeviceState(dinfo.sn, 'recording', (key, newValue, oldValue) => {
setRecording(newValue as Jensen.FileInfo | null);
});
});
}, []);
const doConnect = async () => {
await mgr.tryconnect();
}
const doRequestUserTask = async () => {
if (sn == null) return alert('Please connect first');
const tid = new Date().getTime();
setUserTask('t-' + tid);
const rst = await mgr.registerUserTask(sn, 't-' + tid);
if (!rst) return alert('Failed to request user task');
Logger.info('mgr', 'task-manager', 'User task registered: t-' + tid);
}
const doReleaseUserTask = async () => {
if (sn == null) return alert('Please connect first');
if (userTask == null) return alert('Please request a user task first');
await mgr.unregisterTask(sn, userTask);
Logger.info('mgr', 'task-manager', 'User task released: ' + userTask);
setUserTask(null);
}
const doGetTime = async () => {
if (sn == null) return alert('Please connect first');
if (userTask == null) return alert('Please request a user task first');
try {
// 方式一:使用 getInstance,现在必须传入 userTaskTag
// 所有方法调用都会自动验证 UserTask
// const jensen = mgr.getInstance(sn, userTask);
// if (jensen == null) return alert('Please connect first');
// const time = await jensen.getTime(1);
// alert('Time: ' + time.time);
// 方式二:使用 executeWithUserTask
const time = await mgr.executeWithUserTask(sn, userTask, async (jensen) => {
return await jensen.getTime(1);
});
alert('Time: ' + time.time);
} catch (err: any) {
alert('Error: ' + err.message);
}
}
const doDownloadFile = async (fileName: string) => {
if (sn == null) return alert('Please connect first');
if (userTask == null) return alert('Please request a user task first');
// 需要有一个发起后台任务的方法
try {
const jensen = mgr.getInstance(sn, userTask);
if (jensen == null) return alert('Please connect first');
await jensen.getFile(fileName, 1024, (data) => {
console.log(data);
});
} catch (err: any) {
alert('Error: ' + err.message);
}
}
return (
<>
<h1>DeviceManager</h1>
<div className="btn-container">
<button onClick={doConnect}>Connect</button>
<button onClick={doRequestUserTask}>Request</button>
<button onClick={doReleaseUserTask}>Release</button>
<button onClick={doGetTime}>Get Time</button>
</div>
<div>
<h3>Device Info</h3>
<ol>
<li>SN: {sn}</li>
<li>Time: {time?.toLocaleString()}</li>
<li>Battery: {battery}</li>
<li>Recording: {recording?.name}</li>
</ol>
<h3>Files</h3>
<ol>
{
files.map((file, index) => {
return <li key={index}>{file.name} - {file.length} - {file.duration} <a href="#" onClick={() => { doDownloadFile(file.name) }}>Download</a></li>
})
}
</ol>
</div>
</>
);
}
\ No newline at end of file
import Jensen, { DeviceInfo, ScheduleInfo } from "../../";
import { Logger } from "../Logger";
import { BackgroundTask, DeviceStateSchema, DeviceStateStore, InitializeTask, StateChangeListener, TaskManager, TimerTask } from "./tasks";
// 指令参数
export type CommandOptions = {
......@@ -62,12 +63,74 @@ export class ConnectionManager
private connections: Map<string, Jensen>;
private onautoconnectEventListener: AutoConnectEventHandler | null = null;
private onconnectionstatechangedListener: ConnectionEventHandler | null = null;
constructor (_logger: typeof Logger) {
this.logger = _logger;
this.connections = new Map<string, Jensen>();
this.registerUSBEventListener();
}
// 任务管理器,按设备分组管理
private taskManagers: Map<string, TaskManager> = new Map<string, TaskManager>();
// 注册初始化事务
async registerInitializeTask (sn: string, task: InitializeTask) {
return this.taskManagers.get(sn)?.registerInitializeTask(task);
}
// 注册定时器任务
async registerTimerTask (sn: string, task: TimerTask) {
return this.taskManagers.get(sn)?.registerTimerTask(task);
}
// 注册后台任务
async registerBackgroundTask (sn: string, task: BackgroundTask) {
return this.taskManagers.get(sn)?.registerBackgroundTask(task);
}
// 注册用户任务
async registerUserTask (sn: string, tag: string) {
return this.taskManagers.get(sn)?.applyUserTask(tag);
}
// 注销任务,任意类型的任务都可以注销
async unregisterTask (sn: string, tag: string) {
return this.taskManagers.get(sn)?.releaseUserTask(tag);
}
// 用户任务保活
async keepUserTaskAlive (sn: string, tag: string) {
return this.taskManagers.get(sn)?.keepUserTaskAlive(tag);
}
// 验证用户任务
verifyUserTask (sn: string, tag: string): boolean {
const tm = this.taskManagers.get(sn);
if (!tm) throw new Error('Device not found: ' + sn);
return tm.verifyUserTask(tag);
}
// 获取当前用户任务的 tag
getCurrentUserTaskTag (sn: string): string | null {
return this.taskManagers.get(sn)?.getCurrentUserTaskTag() || null;
}
// 安全执行:验证用户任务后执行回调
// 使用方法: await mgr.executeWithUserTask(sn, tag, (jensen) => jensen.getTime(1));
async executeWithUserTask<T> (
sn: string,
tag: string,
callback: (jensen: Jensen) => Promise<T>
): Promise<T> {
// 先验证用户任务
this.verifyUserTask(sn, tag);
// 获取原始 jensen 实例(内部使用,已验证过 UserTask)
const jensen = this.getInstanceRaw(sn);
if (!jensen) throw new Error('Device not connected: ' + sn);
// 执行回调
return await callback(jensen);
}
async registerUSBEventListener () {
const usb = (navigator as any).usb;
const self = this;
......@@ -137,6 +200,8 @@ export class ConnectionManager
// console.log(k + ' disconnected...');
self.logger.info('jensen', 'ondisconnect', k + ' disconnected');
self.onconnectionstatechangedListener?.('disconnected', k);
self.taskManagers.get(k)?.shutdown();
self.taskManagers.delete(k);
}
});
}
......@@ -191,7 +256,6 @@ export class ConnectionManager
{
this.connections.set(
dinfo.sn,
// new CommandManager(dinfo, inst)
inst
);
this.logger.info('jensen', 'onconnect', dinfo.sn + ' reconnected');
......@@ -202,9 +266,19 @@ export class ConnectionManager
}
this.connections.set(
dinfo.sn,
// new CommandManager(dinfo, inst)
inst
);
// 新建TaskManager的时候,它的各类Tasks应该如何初始化?
// 应该有一个统一的登记的地方
let tm = this.taskManagers.get(dinfo.sn);
if (tm == null) {
tm = new TaskManager(inst, this.logger);
this.taskBuilder(inst, tm);
await tm.scheduleInitializeTasks();
this.taskManagers.set(dinfo.sn, tm);
}
// 先触发连接事件,再启动定时任务循环
try
{
this.onautoconnectEventListener?.(dinfo);
......@@ -214,9 +288,142 @@ export class ConnectionManager
{
this.logger.error('jensen', 'autoconnect', String(err));
}
// scheduleTasks 是一个无限循环,不要 await 它,否则后续代码永远不会执行
tm.scheduleTasks();
}
}
taskBuilder(jensen: Jensen, tm: TaskManager)
{
// 初始化任务的注册 - 数据会被存储到 DeviceStateStore 中
tm.registerInitializeTask({
tag: 'time-sync',
task: async (jensen: Jensen | null, store) => {
let time = await jensen?.getTime(1);
Logger.info('jensen', 'initialize', 'get-time: ' + JSON.stringify(time));
// 存储设备时间到状态容器
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
// 同步设置时间
await jensen?.setTime(new Date(), 1);
Logger.info('jensen', 'initialize', 'set-time-to: ' + new Date().toLocaleString());
}
});
tm.registerInitializeTask({
tag: 'get-settings',
task: async (jensen: Jensen | null, store) => {
let settings = await jensen?.getSettings(1);
Logger.info('jensen', 'initialize', 'get-settings: ' + JSON.stringify(settings));
// 存储设置到状态容器
if (settings) {
store.set('settings', settings);
}
}
});
tm.registerInitializeTask({
tag: 'get-card-info',
task: async (jensen: Jensen | null, store) => {
let cardInfo = await jensen?.getCardInfo(1);
Logger.info('jensen', 'initialize', 'get-card-info: ' + JSON.stringify(cardInfo));
// 存储卡信息到状态容器
if (cardInfo) {
store.set('cardInfo', cardInfo);
}
}
});
tm.registerInitializeTask({
tag: 'battery-status',
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'initialize', 'get-battery-status: ' + JSON.stringify(batteryStatus));
// 存储电池状态到状态容器
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// 定时同步设备电池状态
tm.registerTimerTask({
tag: 'battery-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let batteryStatus = await jensen?.getBatteryStatus(1);
Logger.info('jensen', 'timer-task', 'battery: ' + JSON.stringify(batteryStatus));
// 更新电池状态(会自动触发订阅者)
if (batteryStatus) {
store.set('battery-status', batteryStatus);
}
}
});
// 定时获取设备时间
tm.registerTimerTask({
tag: 'get-device-time',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
if (jensen?.getModel() != 'hidock-p1') return;
let time = await jensen?.getTime(1);
Logger.info('jensen', 'timer-task', 'time: ' + JSON.stringify(time));
// 更新设备时间
if (time) {
store.set('time', { deviceTime: time.time, syncTime: new Date() });
}
}
});
// 录音中的状态同步
tm.registerTimerTask({
tag: 'recording-status',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let recording = await jensen?.getRecordingFile();
Logger.info('jensen', 'timer-task', 'recording-status: ' + JSON.stringify(recording));
if (recording) store.set('recording', recording);
}
});
// 设备录音文件列表同步
tm.registerTimerTask({
tag: 'file-list-sync',
interval: 1000,
task: async (jensen: Jensen | null, store) => {
let needRefresh = false;
// 1. 定时查询文件数量
let fileCount = await jensen?.getFileCount(1);
if (fileCount)
{
let lastCount = store.get('file-count');
if (lastCount != fileCount.count)
{
needRefresh = true;
Logger.info('jensen', 'timer-task', 'file-count changed: ' + lastCount + ' -> ' + fileCount.count);
}
store.set('file-count', fileCount.count);
}
if (!needRefresh) return;
let files = await jensen?.listFiles();
if (files) store.set('files', files);
}
});
// 后台任务的注册(如有需要可以在这里添加)
}
onautoconnect (eventListener: AutoConnectEventHandler) {
this.onautoconnectEventListener = eventListener;
}
......@@ -227,14 +434,94 @@ export class ConnectionManager
this.onconnectionstatechangedListener = eventHandler;
}
// 获取一个Jensen实例
getInstance(tag: string) {
return this.connections.get(tag);
// ===================== 设备状态访问接口 =====================
// 获取指定设备的状态存储
getDeviceStateStore(sn: string): DeviceStateStore | undefined {
return this.taskManagers.get(sn)?.getStateStore();
}
// 获取指定设备的某个状态值
getDeviceState<K extends keyof DeviceStateSchema>(sn: string, key: K): DeviceStateSchema[K] | undefined {
return this.taskManagers.get(sn)?.getState(key);
}
// 设置指定设备的某个状态值
setDeviceState<K extends keyof DeviceStateSchema>(sn: string, key: K, value: DeviceStateSchema[K]): void {
this.taskManagers.get(sn)?.setState(key, value);
}
// 订阅指定设备的状态变更
subscribeDeviceState<K extends keyof DeviceStateSchema>(
sn: string,
key: K,
listener: StateChangeListener<DeviceStateSchema[K]>
): (() => void) | undefined {
return this.taskManagers.get(sn)?.subscribeState(key, listener);
}
// 检查指定设备的状态是否过期
isDeviceStateStale(sn: string, key: keyof DeviceStateSchema, maxAge: number): boolean {
return this.taskManagers.get(sn)?.getStateStore().isStale(key, maxAge) ?? true;
}
// 导出指定设备的状态快照(用于调试)
dumpDeviceState(sn: string) {
return this.taskManagers.get(sn)?.dumpState();
}
// ===================== 原有接口 =====================
// 获取一个受保护的 Jensen 实例(代理对象)
// 所有方法调用都会自动验证 UserTask
// 参数: sn - 设备序列号, userTaskTag - 用户任务标签
getInstance(sn: string, userTaskTag?: string): Jensen | null {
const jensen = this.connections.get(sn);
if (!jensen) return null;
// 如果没有提供 userTaskTag,返回只读代理,阻止所有方法调用
// 如果提供了 userTaskTag,返回验证代理
const taskManager = this.taskManagers.get(sn);
const self = this;
return new Proxy(jensen, {
get(target, prop, receiver) {
const value = Reflect.get(target, prop, receiver);
// 如果是函数,则包装它以进行验证
if (typeof value === 'function') {
return function(...args: any[]) {
// 验证 UserTask
if (!userTaskTag) {
throw new Error('getInstance() requires a userTaskTag parameter. Usage: mgr.getInstance(sn, userTaskTag)');
}
if (!taskManager) {
throw new Error('Device not found: ' + sn);
}
// 调用验证方法(会抛出错误如果验证失败)
taskManager.verifyUserTask(userTaskTag);
// 验证通过,执行原方法
return value.apply(target, args);
};
}
// 非函数属性直接返回
return value;
}
}) as Jensen;
}
// 获取原始的 Jensen 实例(仅供内部使用,不对外暴露)
// 如果需要在内部任务中使用,可以调用此方法
private getInstanceRaw(sn: string): Jensen | undefined {
return this.connections.get(sn);
}
// 设置一个Jensen实例
setInstance(tag: string, jensen: Jensen, oldTag: string | null = null) {
setInstance(sn: string, jensen: Jensen, oldTag: string | null = null) {
if (oldTag) this.connections.delete(oldTag);
this.connections.set(tag, jensen);
this.connections.set(sn, jensen);
}
// 主动连接新设备,如果没有连接成功或是未连接,则直接返回null,如果用户选择的是已连接的设备,则直接抛出异常
......@@ -261,7 +548,6 @@ export class ConnectionManager
{
self.connections.set(
dinfo.sn,
// new CommandManager(dinfo, inst)
jensen
);
return dinfo
......@@ -271,384 +557,12 @@ export class ConnectionManager
// 关闭连接
async close (sn: string) {
let jensen = this.getInstance(sn);
let jensen = this.getInstanceRaw(sn);
if (jensen) jensen.disconnect();
this.connections.delete(sn);
}
}
// 指令管理/代理
class CommandManager
{
// 设备信息
private serialNumber: string | null = null;
// 命令队列
private commands:CommandRequest[] = [];
// 指令Promise响应
private directives: Map<string, CommandPromise> = new Map<string, CommandPromise>();
// 定时任务
private tasks: ScheduledTaskInfo[] = [];
// 是否暂停定时任务
private suspendTimer: boolean = false;
// Jensen实例
private jensen: Jensen | null = null;
// 当前事务
private currentTask: String | null = null;
constructor(dinfo: DeviceInfo, jensen: Jensen)
{
// TODO: 要不要完全保存deviceInfo?
this.serialNumber = dinfo.sn;
this.jensen = jensen;
// 开启定时器
let self = this;
window.setInterval(function() {
self._executeTimer();
}, 1000);
}
dump () {
// TODO: 完成内部成员的状态输出,用于跟踪连接和事务状态
console.log('###############################################################');
console.log('SerialNumber', this.serialNumber);
console.log('Tasks', this.tasks);
console.log('Commands', this.commands);
console.log('SuspendTimer', this.suspendTimer);
console.log('Jensen Instance', this.jensen);
}
setup () {
// 在连接初次建立时需要做的初始化工作,是不是放在这里还没有想好
}
// onconnect事件如何触发?
// 是不是应该全局唯一触发?好像至少多少时间上就应该重新触发一次
// 或者是每次触发,然后可以被别的地方接管走
// 另外,还有一个问题,如果断开了连接再次连接上来,会话都不同了,还是要按SN做标识,否则多连接管理会出问题
// 增加一个新指令消息
// 怎么样触发新指令呢?
async getDeviceInfo(secs?: number) {
// 当前是否可以执行?
// 我是不是要分一下组?
// 我是不是要加一个模式转换的方法?
return this.jensen?.getDeviceInfo(secs);
// return this.request((jensen) => jensen?.getDeviceInfo(), { reconnect: false });
}
getSerialNumber () {
return this.serialNumber;
}
// 更改模式,要不要超时时长?在超过多长时间后要销毁或者说是恢复过去?
// 1. 当前是什么模式
// 2. 请求指令的是来自于哪个模式?
// 3. 整一个事务控制吧
// 事务开始或结束,不传参数就是事务结束了
// 对于每一个事务而言,应该要尽量确保它执行的成功的,包括重连尝试
async begin(secs: number = 5) {
// 如果应用端尝试变更事务切换不过去,那就应该一直等
// 这里是不是要弄一个可以打断的?如果事务与当前事务一致,又应该怎么办?还是要阻止吧?
// 结束事务
if (this.currentTask)
{
throw new Error("pending task: " + this.currentTask);
}
let tid = this.uuid();
this.currentTask = tid;
return tid;
}
end (id: string) {
if (this.currentTask != id) throw new Error('invalid task: ' + this.currentTask + ' --> ' + id);
this.currentTask = null;
}
// 开始/停止定时任务
// 但是如果停止的话,也只是停止下一轮的执行,要不要等待全部停止完成再说?
startTimer () {
this.suspendTimer = false;
}
stopTimer() {
this.suspendTimer = true;
}
private async _executeTimer () {
if (this.suspendTimer) return;
// 遍历所有的定时器任务
for (let i = 0; i < this.tasks.length; i++)
{
let item = this.tasks[i];
let now = new Date().getTime() / 1000;
if (now - item.lastExecuteTime < item.interval) continue;
try
{
let rst = await item.command.apply(null, [this.jensen]);
item.onComplete?.apply(null, [rst]);
/*
item.command.apply(null, [this.jensen])?.then((rst) => {
item.onComplete?.apply(null, [ rst ]);
});
*/
}
catch(ex)
{
console.error(ex);
}
finally
{
item.lastExecuteTime = now;
}
}
}
// 注册定时任务,返回值为它在列表中的索引位置,可能后面有要解除定时执行的需要,先预留一下
// 这里有个问题,我要如何完成回调呢?
async registerTimer(task: ScheduledTask) {
let len = this.tasks.length;
this.tasks.push({
interval: task.interval,
command: task.command,
lastExecuteTime: 0,
executedCount: 0
});
return len;
}
// 指令分两种,有复杂结构的返回值的或是普通boolean类型的两种
async request (func: CommandProxy, opts: CommandOptions) {
// TODO: 需要在这里注册一个Promise
let self = this;
let rid = this.uuid();
let future = new Promise((resolve, reject) => {
self.directives.set(rid, {
id: rid,
resolve: resolve,
reject: reject,
timeout: opts.expires || 5
});
});
this.commands.push({ command: func, id: rid, options: opts, createTime: new Date().getTime() });
return future;
}
// 需要怎么样持续的执行呢?
async execute(func: CommandProxy, opts: CommandOptions) {
let tid = await this.begin(opts.wait);
try
{
let args: any[] | undefined = opts.params;
let execCount = 0;
let maxTries = (opts.retry || 0) + 1;
do {
execCount += 1;
try
{
let rst: any = await func(this.jensen);
if (rst) return rst;
if (opts.reconnect)
{
// TODO: 连接重新建立
this.logger.info('jensen', 'execute', 'try reconnect');
}
}
catch(ex)
{
console.error(ex);
}
}
while (execCount < maxTries);
}
finally
{
this.end(tid);
}
}
private async tryReconnect()
{
try
{
// TODO: 需要准备一个新的reconnect方法,使用同一个WebUSB实例来close/open来实现重连,否则必然会乱掉
this.jensen?.reconnect();
}
catch(ex)
{
console.error(ex);
}
}
async getTime (timeout?: number) {
return this.jensen?.getTime(timeout);
}
async setTime (date: Date, timeout?: number) {
return this.jensen?.setTime(date, timeout);
}
async getRecordingFile () {
return this.jensen?.getRecordingFile();
}
async getFileCount (timeout?: number) {
return this.jensen?.getFileCount(timeout);
}
async listFiles (timeout?: number) {
return this.jensen?.listFiles(timeout);
}
async readFile (fname: string, offset: number, length: number) {
return this.jensen?.readFile(fname, offset, length);
}
async getFile (fileName: string, length: number, on?: (msg: Uint8Array | 'fail') => void, onprogress?: (size: number) => void) {
return this.jensen?.getFile(fileName, length, on, onprogress);
}
async getFilePart (fileName: string, length: number, on?: (msg: Uint8Array | 'fail') => void, onprogress?: (size: number) => void) {
return this.jensen?.getFilePart(fileName, length, on, onprogress);
}
async getFileBlock (fileName: string, length: number, on?: (msg: Uint8Array | 'fail') => void) {
return this.jensen?.getFileBlock(fileName, length, on);
}
async requestFirmwareUpgrade (vn: number, length: number, timeout?: number) {
return this.jensen?.requestFirmwareUpgrade(vn, length, timeout);
}
async uploadFirmware (data: number[], timeout?: number, onProgress?: (cur: number, total: number) => void) {
return this.jensen?.uploadFirmware(data, timeout, onProgress);
}
async beginBNC (timeout?: number) {
return this.jensen?.beginBNC(timeout);
}
async endBNC (timeout?: number) {
return this.jensen?.endBNC(timeout);
}
async deleteFile (fileName: string) {
return this.jensen?.deleteFile(fileName);
}
async getSettings (timeout?: number) {
return this.jensen?.getSettings(timeout);
}
async setAutoRecord (enable: boolean, timeout?: number) {
return this.jensen?.setAutoRecord(enable, timeout);
}
async setAutoPlay (enable: boolean, timeout?: number) {
return this.jensen?.setAutoPlay(enable, timeout);
}
async setNotification (enable: boolean, timeout?: number) {
return this.jensen?.setNotification(enable, timeout);
}
async setBluetoothPromptPlay (enable: boolean, timeout?: number) {
return this.jensen?.setBluetoothPromptPlay(enable, timeout);
}
async getCardInfo (timeout?: number) {
return this.jensen?.getCardInfo(timeout);
}
async formatCard (timeout?: number) {
return this.jensen?.formatCard(timeout);
}
async factoryReset (timeout?: number) {
return this.jensen?.factoryReset(timeout);
}
async restoreFactorySettings (timeout?: number) {
return this.jensen?.restoreFactorySettings(timeout);
}
async recordTestStart (type: number, timeout?: number) {
return this.jensen?.recordTestStart(type, timeout);
}
async recordTestEnd (type: number, timeout?: number) {
return this.jensen?.recordTestEnd(type, timeout);
}
async test (timeout?: number) {
return this.jensen?.test(timeout);
}
async writeSerialNumber (sn: string) {
return this.jensen?.writeSerialNumber(sn);
}
async getRealtimeSettings () {
return this.jensen?.getRealtimeSettings();
}
async startRealtime () {
return this.jensen?.startRealtime();
}
async pauseRealtime () {
return this.jensen?.pauseRealtime();
}
async stopRealtime () {
return this.jensen?.stopRealtime();
}
async getRealtime (frames: number) {
return this.jensen?.getRealtime(frames);
}
async scanDevices (timeout?: number) {
return this.jensen?.scanDevices(timeout);
}
async connectBTDevice (mac: string, timeout?: number) {
return this.jensen?.connectBTDevice(mac, timeout);
}
async disconnectBTDevice (timeout?: number) {
return this.jensen?.disconnectBTDevice(timeout);
}
async getBluetoothStatus (timeout?: number) {
return this.jensen?.getBluetoothStatus(timeout);
}
async requestToneUpdate (signature: string, size: number, timeout?: number) {
return this.jensen?.requestToneUpdate(signature, size, timeout);
}
async updateTone (data: Uint8Array, timeout?: number) {
return this.jensen?.updateTone(data, timeout);
}
async requestUACUpdate (signature: string, size: number, timeout?: number) {
return this.jensen?.requestUACUpdate(signature, size, timeout);
}
async updateUAC (data: Uint8Array, timeout?: number) {
return this.jensen?.updateUAC(data, timeout);
}
async sendScheduleInfo (info: ScheduleInfo[]) {
return this.jensen?.sendScheduleInfo(info);
}
// 生成唯一id,暂时用随机数生成,后面最好用顺序递增的整数比较好
private uuid () {
return String(Math.floor(1000_0000_0000_0000 + Math.random() * 1000_0000_0000_0000));
}
}
function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
......
import Jensen, { BatteryStatus, BluetoothStatus, DeviceInfo, FileInfo } from "../../jensen";
import { Logger } from "../Logger";
// ===================== 设备状态存储相关类型 =====================
// 状态变更监听器类型
export type StateChangeListener<T> = (key: string, newValue: T, oldValue: T | undefined) => void;
// 设备状态Schema - 定义所有可存储的状态类型
export interface DeviceStateSchema {
deviceInfo: DeviceInfo | null;
time: { deviceTime: string; syncTime: Date } | null;
settings: { autoRecord: boolean; autoPlay: boolean; notification?: boolean } | null;
cardInfo: { used: number; capacity: number; status: string } | null;
batteryStatus: BatteryStatus | null;
bluetoothStatus: BluetoothStatus | null;
files: FileInfo[] | null;
// 可按需扩展其他状态...
[key: string]: any; // 允许动态扩展
}
// ===================== 设备状态存储类 =====================
export class DeviceStateStore {
// 内部数据存储
private state: Partial<DeviceStateSchema> = {};
// 更新时间记录
private updateTimes: Map<string, number> = new Map();
// 状态变更监听器
private listeners: Map<string, StateChangeListener<any>[]> = new Map();
// 获取状态
get<K extends keyof DeviceStateSchema>(key: K): DeviceStateSchema[K] | undefined {
return this.state[key];
}
// 设置状态
set<K extends keyof DeviceStateSchema>(key: K, value: DeviceStateSchema[K]): void {
const oldValue = this.state[key];
this.state[key] = value;
this.updateTimes.set(key as string, Date.now());
// 触发监听器
this.notifyListeners(key as string, value, oldValue);
}
// 获取状态最后更新时间
getUpdateTime(key: keyof DeviceStateSchema): number | undefined {
return this.updateTimes.get(key as string);
}
// 判断状态是否过期
isStale(key: keyof DeviceStateSchema, maxAge: number): boolean {
const updateTime = this.updateTimes.get(key as string);
if (!updateTime) return true;
return Date.now() - updateTime > maxAge;
}
// 检查状态是否存在
has(key: keyof DeviceStateSchema): boolean {
return key in this.state && this.state[key] !== undefined;
}
// 订阅状态变更
subscribe<K extends keyof DeviceStateSchema>(
key: K,
listener: StateChangeListener<DeviceStateSchema[K]>
): () => void {
const keyStr = key as string;
if (!this.listeners.has(keyStr)) {
this.listeners.set(keyStr, []);
}
this.listeners.get(keyStr)!.push(listener);
// 返回取消订阅函数
return () => {
const arr = this.listeners.get(keyStr);
if (arr) {
const idx = arr.indexOf(listener);
if (idx !== -1) arr.splice(idx, 1);
}
};
}
// 触发监听器
private notifyListeners<T>(key: string, newValue: T, oldValue: T | undefined): void {
const arr = this.listeners.get(key);
if (arr) {
arr.forEach(listener => {
try {
listener(key, newValue, oldValue);
} catch (e) {
console.error('StateChangeListener error:', e);
}
});
}
}
// 清空所有状态
clear(): void {
this.state = {};
this.updateTimes.clear();
// 不清除监听器,允许重连后继续监听
}
// 清空所有监听器
clearListeners(): void {
this.listeners.clear();
}
// 导出状态快照(用于调试)
dump(): { state: Partial<DeviceStateSchema>; updateTimes: Record<string, number> } {
const times: Record<string, number> = {};
this.updateTimes.forEach((v, k) => times[k] = v);
return {
state: { ...this.state },
updateTimes: times
};
}
}
// ===================== 任务类型定义 =====================
export type InitializeTask = {
tag: string;
task: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
}
export type TimerTask = {
tag: string;
interval: number;
lastExecuteTime?: number;
task: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
onComplete?: (success: boolean, data: any) => void;
}
export type BackgroundTask = {
tag: string;
// 用于任务调度
schedule: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
// 用于设备交互使用
deviceTask?: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
// 任务善后处理调用
generalTask?: (jensen: Jensen | null, store: DeviceStateStore) => Promise<any>;
lastExecuteTime: number;
interval: number; // 轮询间隔
}
export type UserTask = {
tag: string;
startTime: number;
lastActiveTime: number;
}
// 用户任务最大空闲时间,超过这个时间则直接取消任务执行权限
const USER_TASK_EXPIRE_TIME = 30 * 1000;
export class TaskManager {
private logger: typeof Logger;
private jensen: Jensen | null;
// 是否已经启动了任务调度
private started: boolean = false;
private runnable: boolean = true;
// 初始化任务
private initializeTasks: InitializeTask[] = [];
// 定时任务
private timerTasks: TimerTask[] = [];
// 后台任务
private backgroundTasks: BackgroundTask[] = [];
// 当前用户任务,排它性的任务
private userTask: UserTask | null = null;
// 设备状态存储
private stateStore: DeviceStateStore = new DeviceStateStore();
constructor (jensen: Jensen | null, _logger: typeof Logger)
{
this.jensen = jensen;
this.logger = _logger;
}
// 获取状态存储实例(供外部访问)
getStateStore(): DeviceStateStore {
return this.stateStore;
}
// 快捷方法:获取状态
getState<K extends keyof DeviceStateSchema>(key: K): DeviceStateSchema[K] | undefined {
return this.stateStore.get(key);
}
// 快捷方法:设置状态
setState<K extends keyof DeviceStateSchema>(key: K, value: DeviceStateSchema[K]): void {
this.stateStore.set(key, value);
}
// 快捷方法:订阅状态变更
subscribeState<K extends keyof DeviceStateSchema>(
key: K,
listener: StateChangeListener<DeviceStateSchema[K]>
): () => void {
return this.stateStore.subscribe(key, listener);
}
// 注册初始化任务
registerInitializeTask(task: InitializeTask) {
this.initializeTasks.push(task);
}
// 注册定时任务
registerTimerTask(task: TimerTask) {
this.timerTasks.push(task);
}
// 注册后台任务
registerBackgroundTask(task: BackgroundTask) {
this.backgroundTasks.push(task);
}
// 申请用户任务,当timeout后仍然没有申请到,则返回false
async applyUserTask(tag: string, timeout: number = 3000) {
const stime = new Date().getTime();
while (true) {
if (new Date().getTime() - stime > timeout) return false;
if (this.userTask)
{
const now = new Date().getTime();
if (now - this.userTask.lastActiveTime > USER_TASK_EXPIRE_TIME)
{
this.userTask = null;
break;
}
await sleep(1000);
continue;
}
break;
}
const now = new Date().getTime();
this.userTask = {
tag: tag,
startTime: now,
lastActiveTime: now
};
return true;
}
// 释放用户任务
releaseUserTask(tag: string) {
if (this.userTask?.tag === tag)
{
this.userTask = null;
return true;
}
throw new Error('no task registered: ' + tag);
}
// 保持用户任务活跃(防止超时被释放)
keepUserTaskAlive(tag: string)
{
if (this.userTask)
{
this.userTask.lastActiveTime = new Date().getTime();
}
else throw new Error('task not found: ' + tag);
}
// 验证用户任务是否有效
// 返回 true 表示验证通过,否则抛出错误
verifyUserTask(tag: string): boolean {
if (!this.userTask) {
throw new Error('No user task registered. Please register a user task first.');
}
if (this.userTask.tag !== tag) {
throw new Error(`User task tag mismatch: expected "${this.userTask.tag}", got "${tag}"`);
}
// 检查是否过期
const now = new Date().getTime();
if (now - this.userTask.lastActiveTime > USER_TASK_EXPIRE_TIME) {
this.userTask = null;
throw new Error('User task has expired. Please register a new user task.');
}
// 更新活跃时间
this.userTask.lastActiveTime = now;
return true;
}
// 获取当前用户任务的 tag(如果存在)
getCurrentUserTaskTag(): string | null {
return this.userTask?.tag || null;
}
shutdown() {
this.runnable = false;
this.stateStore.clear();
this.stateStore.clearListeners();
}
async scheduleInitializeTasks() {
// TODO: 需要确保其它任务处于暂停状态
for (let i = 0; i < this.initializeTasks.length; i++)
{
try
{
await this.initializeTasks[i].task(this.jensen, this.stateStore);
}
catch(ex)
{
Logger.error('jensen', 'initialize', String(ex));
}
}
}
// 任务的调度
async scheduleTasks() {
// 轮询执行定时任务和后台任务
while (this.runnable)
{
// 如果用户任务活动超时,则释放用户任务
if (this.userTask)
{
const now = new Date().getTime();
if (now - this.userTask.lastActiveTime > USER_TASK_EXPIRE_TIME)
{
Logger.error('mgr', 'task-manager', 'User task has expired: ' + this.userTask.tag);
this.userTask = null;
}
}
// 遍历所有的后台任务,是否有执行的必要?
for (let i = 0; i < this.backgroundTasks.length; i++)
{
let now = new Date().getTime();
let item = this.backgroundTasks[i];
if (this.userTask)
{
item.lastExecuteTime = now;
continue;
}
try
{
await item.schedule(this.jensen, this.stateStore);
item.lastExecuteTime = new Date().getTime();
}
catch(ex)
{
Logger.error('jensen', 'bg-task', String(ex));
}
}
// 遍历所有的定时任务,检查是否到了执行时间
for (let i = 0; i < this.timerTasks.length; i++)
{
let now = new Date().getTime();
let item = this.timerTasks[i];
if (this.userTask)
{
item.lastExecuteTime = now;
continue;
}
if (now - (item.lastExecuteTime || 0) < item.interval) continue;
try
{
await item.task(this.jensen, this.stateStore);
item.lastExecuteTime = new Date().getTime();
}
catch(ex)
{
Logger.error('jensen', 'timer-task', String(ex));
}
}
await sleep(100);
}
}
// 导出状态快照(用于调试)
dumpState() {
return this.stateStore.dump();
}
}
function sleep(time: number) {
return new Promise(resolve => setTimeout(resolve, time));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment