JavaScript 中的异步编程方式与实践
李志文
2022/03
目录概览
- 一、为什么要异步编程
- 二、实现异步编程的几种方式
- 三、JavaScript 异步编程实践
- FAQ
一、为什么要异步编程
IO 操作:外部设备访问
- 文件存取
- TCP/UDP 网络访问
- More...
异步 API
- setTimeout / setInterval
- requestAnimationFrame
- requestIdleCallback
- queueMicrotask
- setImmediate
- process.nextTick
- Event
- ...
二、实现异步编程的几种方式
- 回调函数
- Promise
- async + await
- 事件监听 / 观察者模式
2.1 回调函数
const fs = require('fs');
fs.readFile('./a.txt', (err, data) => {
if(err) {
console.log('readFile.error:', err);
} else {
fs.readFile('./b.txt', (err1, data1) => {
if (err1) {
console.log('readFile.error:', err1);
} else {
return data + data1;
}
});
}
});
- 特点
- 基础语法特性、高性能
- 问题:回调地狱,调用逻辑异常复杂
2.2 Promise
// Promise:示例一
fs.promises.readFile('./a.txt', 'utf8')
.then(a => fs.promises.readFile('./b.txt', 'utf8').then(b => a + b))
.then(result => console.log(result));
.catch (err => {console.log(err));
// Promise:封装示例
function readFile(path) {
return new Promise((resolve, reject) => {
fs.readFile(path, (err, data) => err ? reject(err) : resolve(data));
});
}
readFile('./a.txt')
.then(a => readFile('./b.txt').then(b => a + b))
.then(result => console.log(result))
.catch(err => console.error(err));
- 优缺点
-
ESNext
=> 几乎是所有新语法异步 API 的基础 -
then
回调并不简洁
-
2.3 async + await
try {
const a = await fs.promises.readFile('./a.txt', 'utf8');
const b = await fs.promises.readFile('./b.txt', 'utf8');
const result = a + b;
console.log(result);
} catch (err) {
console.log(err);
}
- 特点
- 语法糖:基于
Promise
的基础支持 - 如编写同步代码般进行异步编程
- More...
- 语法糖:基于
2.4 事件监听与处理
const uid = uuid.v4();
const payload = {...};
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => console.log(result));
- 特点
- 逻辑分离,简化模块间依赖调用
- 具有不可控性
- 涉及多变量因子的复杂场景下,上层逻辑变动、订阅依赖顺序等容易逻辑混乱
3. JavaScript 异步编程实践
- 常见应用场景与方法
- 延迟处理:并发性能、避免 IO 阻塞
- Promise 化
- 使用 async + await
- 防抖与节流
- More...
3.1 延迟处理
// delay 封装:callback 模式
function delay(timeout = 0, callback: () => void) {
setTimeout(() => callback(), timeout);
}
// delay 封装:Promise 模式
function delay(timeout = 0) {
return new Promise(resolve => setTimeou(() => resolve(), timeout));
}
await delay(3_00).then(() => callback());
callback();
// sleep 封装:TS 类型、回调值支持
export const sleep = <T>(timeout = 0, value?: T | (() => T | Promise<T>)): Promise<T> =>
new Promise(resolve => setTimeout(() => resolve(), timeout))
.then(() => typeof value === 'function' ? value() : value);
await sleep(3_000, 1);
3.2 事件监听与订阅的 Promise 化
function request(payload) {
return new Promise(resolve => {
const uid = uuid.v4();
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => resolve(result));
});
}
request({...}).then(body => console.log(body));
问:如何实现超时处理?
setTimeout
与超时处理
function request(payload, timeout = 5_000) {
return new Promise(resolve => {
const uid = uuid.v4();
const timer = setTimeout(() => resolve({ errmsg: 'timeout' }), timeout);
workerChannel.once(uid, (result) => {
clearTimeout(timer);
resolve(result);
});
workerChannel.postMessage({ uid, payload });
});
}
request({...}).then(body => console.log(body));
问:如何实现超时处理的通用性封装?
超时处理的通用性封装:raceTimeout
export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => T | undefined): Promise<T | undefined> {
let timer: NodeJS.Timer;
return Promise.race([
promise.finally(() => clearTimeout(timer)),
new Promise<T | undefined>(resolve => {
timer = setTimeout(() => resolve(onTimeout?.()), timeout);
}),
]);
}
超时处理的通用性封装:raceTimeout
- 示例
// 通过 `raceTimeout` 调用 `request` 处理超时:
raceTimeout(request({...}), 3_000, () => ({ errmsg: 'timeout' }))
.then(body => console.log(body));
// 使用 `raceTimeout` 封装通用超时处理:
function request(payload, timeout = 5_000) {
const p = new Promise(resolve => {
const uid = uuid.v4();
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => resolve(result));
});
return raceTimeout(p, timeout, () => ({ errmsg: 'timeout' }));
}
request({...}).then(body => console.log(body));
超时处理的通用性封装:timeoutDeferred
interface IScheduledLater extends IDisposable {
isTriggered(): boolean;
}
function timeoutDeferred(timeout: number, fn: () => void): IScheduledLater {
let scheduled = true;
const handle = setTimeout(() => {
scheduled = false;
fn();
}, timeout);
return {
isTriggered: () => scheduled,
dispose: () => {
clearTimeout(handle);
scheduled = false;
},
};
}
使用 timeoutDeferred 封装通用超时处理 - 示例:
function request(payload, timeout = 5_000) {
const p = new Promise(resolve => {
const uid = uuid.v4();
const deferred = timeoutDeferred(timeout, resolve({ errmsg: 'timeout' }));
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => {
deferred.dispose();
resolve(result);
});
});
}
request({...}).then(body => console.log(body));
- 简单的使用并不比直接使用
setTimeout
简洁 -
timeoutDeferred
更便利的用处是在复杂逻辑流程中,基于不同的变量因子决定如何执行deferred.dispose()
3.3 函数防抖(debounce) 与节流(throttle)
以邮政员送信为例:
- 邮局接收信件 -
letters = []
; - 邮政员送信 -
function deliver(){}
3.3 函数防抖(debounce) 与节流(throttle)
const letters = [];
/** 邮局接收信件 */
function onLetterReceived(l) {
letters.push(l);
deliver(); // 派送策略?
}
/** 邮政员派送信件 */
function deliver() {
const lettersToDeliver = letters;
letters = [];
return makeTheTrip(lettersToDeliver);
}
- 收到信件即执行
makeTheTrip
。要求:- 收件频率低?
- 有非常多的邮政员?
- 送件速度快?
- More...
3.3.1 lodash
:: 函数防抖与节流
import { throttle, debounce } from 'lodash';
// 节流:100ms 内最多执行一次
const throttler = throttle(deliver, 100);
// 防抖:间隔 100ms 以上才触发
const debounced = debounce(deliver, 100);
// 防抖:高频调用 - 每隔 1s 至少会触发一次
const debounced = debounce(deliver, 100, { maxWait: 1000 });
- 存在的问题:
- 最佳取值:
100ms
与1000ms
如何确定? - 无法实现 CPU 最大化利用
- 无法较好的处理耗时幅度大的不确定任务调度
- 最佳取值:
- 为什么?
- 主要原因:无法得知何时送信结束
-
callback
:: 用回回调地狱模式? - 解决:不妨试试
Promise
?
3.3.2 Promise
式的防抖与节流
export class Throttler {
/** 正在执行的任务句柄 */
private activePromise: Promise<unknown> | null;
/** 等待执行的任务句柄 */
private queuedPromise: Promise<unknown> | null;
/** 等待执行的任务 */
private queuedPromiseFactory: ITask<Promise<unknown>> | null;
public queue<T>(promiseFactory: ITask<Promise<T>>): Promise<T>;
}
const throttler = new Throttler();
/** 邮局接收信件 */
function onLetterReceived(l) {
letters.push(l);
throttler.queue(deliver);
}
Throttler
:: 以节流方式执行async
回调任务
- 派送策略
- 来信即送启动派送任务
- 每次派送都拿走全部的邮件
- 等待队列永远只缓存最新的一个任务 -
queuedPromiseFactory
3.3.3 Sequencer
:: 顺序的执行 async 回调任务
export class Sequencer {
private current: Promise<unknown> = Promise.resolve(null);
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {
return (this.current = this.current.then(
() => promiseTask(),
() => promiseTask()
));
}
}
- 特点
-
One By One
- 与
Throttler
的不同点:等待队列(queue
)无限制 - 简单的封装,方便的调用
-
3.3.4 区分多类型的顺序执行 async 任务
按 key
缓存不同类型的 Sequencer
export class SequencerByKey<TKey> {
private promiseMap = new Map<TKey, Promise<unknown>>();
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
const newPromise = runningPromise
.catch(() => {})
.then(promiseTask)
.finally(() => {
if (this.promiseMap.get(key) === newPromise) {
this.promiseMap.delete(key);
}
});
this.promiseMap.set(key, newPromise);
return newPromise;
}
}
3.3.5 防抖式的执行异步任务
export class Delayer<T> implements IDisposable {
private deferred: IScheduledLater | null;
private completionPromise: Promise<unknown> | null;
private doResolve: ((value?: unknown | Promise<unknown>) => void) | null;
private doReject: ((err: unknown) => void) | null;
private task: ITask<T | Promise<T>> | null;
constructor(public defaultDelay: number) {}
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T>;
isTriggered(): boolean;
cancel(): void;
}
const delayer = new Delayer(10_000);
const letters = [];
function letterReceived(l) {
letters.push(l);
delayer.trigger(() => makeTheTrip());
}
邮递员收到信时并不立即派送,只有一段时间内没有信件到达,才出去送信
优缺点:
- 延迟执行
- 有状态、可取消
- 调用方式简洁,业务逻辑清晰
- 缺点:高频调用一直被 cancel,不能及时调用
ThrottledDelayer
:: 防抖 + 节流
邮递员很聪明,在出去送信之前,他会等待一定的时间(不会一直等待)。
export class ThrottledDelayer<T> {
private delayer: Delayer<Promise<T>>;
private throttler: Throttler;
constructor(defaultDelay: number);
trigger(promiseFactory: ITask<Promise<T>>, delay?: number): Promise<T> {
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
}
isTriggered(): boolean { return this.delayer.isTriggered(); }
cancel(): void { this.delayer.cancel(); }
dispose(): void { this.delayer.dispose(); }
}
- 延迟执行高频任务:送信前等待一定的时间
- 送信时新的延时任务以防抖的方式调用:
delayer.trigger
、delayer.completionPromise
- 送信完成则立即进入下一次送信旅程:
throttler.queuedPromise
3.3.7 Barrier
:初始化前的调用屏障
创建一个初始状态为关闭、最后为永久打开的一个屏障。
export class Barrier {
private _isOpen: boolean = false;
private _promise: Promise<boolean>;
private _completePromise!: (v: boolean) => void;
constructor() {
this._promise = new Promise<boolean>((c, _e) => {
this._completePromise = c;
});
}
isOpen(): boolean { return this._isOpen }
open(): void {
this._isOpen = true;
this._completePromise(true);
}
wait(): Promise<boolean> { return this._promise }
}
问:邮政员尚未上班,如何处理触发的邮件投递任务?
const barrier = new Barrier();
async function letterReceived(l) {
letters.push(l);
await barrier.wait(); // 等待就绪后调用
makeTheTrip();
}
// ...
barrier.open(); // 邮政员上班了
- 优点:
- 不使用创建缓冲区、等待回调等繁琐的方式
- 调用链简化,逻辑简洁清晰
超时自动打开的屏障:AutoOpenBarrier
邮政员一直不来上班,如何自动启用备选方案?
export class AutoOpenBarrier extends Barrier {
private readonly _timeout: NodeJS.Timer;
constructor(autoOpenTimeMs: number) {
super();
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);
}
override open(): void {
clearTimeout(this._timeout);
super.open();
}
}
3.3.9 retry
失败重试
async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number, validator?: (r: T) => boolean): Promise<T> {
let lastError: Error | undefined;
for (let i = 0; i < retries; i++) {
try {
const result = await task();
if (!validator || validator(result)) return result;
} catch (error) {
lastError = error;
await sleep(delay);
}
}
throw lastError;
}
// `retry` 应用示例:
async function doLogin(): { success: boolean } {}
const result = await retry(doLogin, 1_000, 3, r => r.success);
4. 相关参考
- CSDN:: 浅谈 Node.js 中的异步编程原理和实践
- https:://github.com/microsoft/vscode/blob/main/src/vs/base/common/async.ts
FAQ
* * *