共计 11753 个字符,预计需要花费 30 分钟才能阅读完成。
提醒:本文最后更新于2025-07-07 14:38,文中所关联的信息可能已发生改变,请知悉!
1. 为什么要异步编程
JavaScript 是一种单线程运行的的编程语言,同一时刻只能执行一个任务。为了处理不同的任务调度逻辑,异步编程在 JavaScript 编程开发中是无法避免的。
在以下列举的场景中,均必然涉及异步编程方法:
- IO 操作:外部设备访问
- 文件存取
- TCP / UDP 网络访问
- 异步 API
- setTimeout / setInterval
- setImmediate
- process.nextTick
- queueMicrotask
- Event
2. 实现异步编程的几种方式
- 回调函数
- 事件监听 / 观察者模式
- Promise
- Async / Await
函数可以作为函数的参数传递,这是 Javascript 支持异步编程的最基本的形式。故回调函数模式是最为广泛也几乎是早期(ES5-)唯一可选的异步编程手段。
随着 Promise
概念的引入,JavaScript 现代语法(ES6+)面向异步编程开始越来简洁友好。
自从 ES7 将基于 Promise
的 Async / Await
被引入语法标准后,异步编程可以也变得如同写同步代码一样简洁清晰。
2.1 回调函数
函数可以作为函数的参数传递,这是 Javascript 支持异步编程的最基本的形式,这种特性使得 JavaScript 异步编程非常灵活而简单。回调函数模式是最为广泛也几乎是早期(ES5-)唯一可选的异步编程手段。但其让人最为诟病的是,在复杂业务逻辑中,不得不使用大量的异步回调,从而带来深度嵌套的回调地狱模式,使得逻辑异常复杂难读而不易维护。
const fs = require('fs'); | |
fs.readFile('./a.txt', (err, data) => { | |
if(err) { | |
console.log('readFile.error:', err); | |
} else { | |
fs.readFile('./b.txt', (err1, data1) => { | |
if (err1) { | |
console.log('readFile.error:', err1); | |
} else { | |
return data + data1; | |
} | |
}); | |
} | |
}); |
2.2 事件的监听与处理
事件的监听与处理是一种订阅者模式,本质上也都是基于回调函数的方式实现。它在基于 JavaScript 的开发中十分普遍。订阅者模式可以很容易的将关联度不高的逻辑实现分离,简化模块间依赖调用。基于事件的订阅者模式在 jQuery
时代得到了大量的应用。
但在重前端架构的大型项目中,数据依赖关系往往具有极高的复杂性,涉及多变量因子的复杂场景下,订阅者模式的数据状态具有很大不可控性,容易因为不同变量因子的逻辑微调而导致订阅依赖逻辑的混乱。因此也出现了许多面向中大型前端项目的数据流管理工具库。数据流管理本质上也仍然是基于回调函数的方式实现。
const uid = uuid.v4(); | |
const payload = {...}; | |
workerChannel.postMessage({ uid, payload }); | |
workerChannel.once(uid, (result) => console.log(result)); |
2.3 Promise
随着 Promise
概念的引入,JavaScript 现代语法(ES6+)面向异步编程开始越来简洁友好。Promise
比回调函数好了许多,可以很好的避免回调地狱模式,不过其 then
回调的模式并不简洁。但是 Promise
为后续 ES6+ 标准中的几乎所有异步相关 API 提供了基础能力支持。
fs.promises.readFile('./a.txt', 'utf8') | |
.then(a => fs.promises.readFile('./b.txt', 'utf8').then(b => a + b)) | |
.then(result => console.log(result)); | |
.catch (err => {console.log(err)); |
function readFile(path) { | |
return new Promise((resolve, reject) => { | |
fs.readFile(path, (err, data) => err ? reject(err) : resolve(data)); | |
}); | |
} | |
readFile('./a.txt') | |
.then(a => readFile('./b.txt').then(b => a + b)) | |
.then(result => console.log(result)) | |
.catch(err => console.error(err)); |
2.4 Async / Await
自从 ES7(ES2016)
将基于 Promise
的 Async / Await
被引入语法标准后,异步编程可以也变得如同写同步代码一样简洁清晰。ES13(ES2022)
将 Top-level Await
也纳入了规范,自此 Await
可以脱离 Async
函数的局限在 ESM 模块任意位置使用。 其主要特点可概述为:
- 语法糖:基于
Promise
的基础支持 - 如编写同步代码般进行异步编程
try { | |
const a = await fs.promises.readFile('./a.txt', 'utf8'); | |
const b = await fs.promises.readFile('./b.txt', 'utf8'); | |
const result = a + b; | |
console.log(result); | |
} catch (err) { | |
console.log(err); | |
} |
3. JavaScript 异步编程实践
Promise
化- 使用
Async / Await
- 并发性能:避免 IO 阻塞
- 防抖与节流
3.1 延迟处理
// delay 封装:callback 模式 | |
function delay(timeout = 0, callback: () => void) { | |
setTimeout(() => callback(), timeout); | |
} |
// delay 封装:Promise 模式 | |
function delay(timeout = 0) { | |
return new Promise(resolve => setTimeou(() => resolve(), timeout)); | |
} | |
await delay(3_00).then(() => callback()); | |
callback(); |
// sleep 封装:TS 类型、回调值支持 | |
export const sleep = <T>(timeout = 0, value?: T | (() => T | Promise<T>)): Promise<T> => | |
new Promise(resolve => setTimeout(() => resolve(), timeout)) | |
.then(() => typeof value === 'function' ? value() : value); | |
await sleep(3_000, 1); |
3.2 事件监听与订阅的 Promise 化
function request(payload) { | |
return new Promise(resolve => { | |
const uid = uuid.v4(); | |
workerChannel.postMessage({ uid, payload }); | |
workerChannel.once(uid, (result) => resolve(result)); | |
}); | |
} | |
request({...}).then(body => console.log(body)); |
问:如何实现超时处理?
3.2.1 setTimeout
与超时处理
function request(payload, timeout = 5_000) { | |
return new Promise(resolve => { | |
const uid = uuid.v4(); | |
const timer = setTimeout(() => resolve({ errmsg: 'timeout' }), timeout); | |
workerChannel.once(uid, (result) => { | |
clearTimeout(timer); | |
resolve(result); | |
}); | |
workerChannel.postMessage({ uid, payload }); | |
}); | |
} | |
request({...}).then(body => console.log(body)); |
问:如何实现超时处理的通用性封装?
3.2.2 超时处理的通用性封装:raceTimeout
export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => T | undefined): Promise<T | undefined> { | |
let promiseResolve: ((value: T | undefined) => void) | undefined = undefined; | |
const timer = setTimeout(() => promiseResolve?.(onTimeout?.()), timeout); | |
return Promise.race([promise.finally(() => clearTimeout(timer)), new Promise<T | undefined>(resolve => (promiseResolve = resolve))]); | |
} |
通过 raceTimeout
调用 request
处理超时:
raceTimeout(request({...}), 3_000, () => ({ errmsg: 'timeout' })) | |
.then(body => console.log(body)); |
使用 raceTimeout
封装通用超时处理:
function request(payload, timeout = 5_000) { | |
const p = new Promise(resolve => { | |
const uid = uuid.v4(); | |
workerChannel.postMessage({ uid, payload }); | |
workerChannel.once(uid, (result) => resolve(result)); | |
}); | |
return raceTimeout(p, timeout, () => ({ errmsg: 'timeout' })); | |
} | |
request({...}).then(body => console.log(body)); |
3.2.3 超时处理的通用性封装:timeoutDeferred
interface IScheduledLater extends IDisposable { | |
isTriggered(): boolean; | |
} | |
function timeoutDeferred(timeout: number, fn: () => void): IScheduledLater { | |
let scheduled = true; | |
const handle = setTimeout(() => { | |
scheduled = false; | |
fn(); | |
}, timeout); | |
return { | |
isTriggered: () => scheduled, | |
dispose: () => { | |
clearTimeout(handle); | |
scheduled = false; | |
}, | |
}; | |
} |
使用 timeoutDeferred
封装通用超时处理示例:
function request(payload, timeout = 5_000) { | |
const p = new Promise(resolve => { | |
const uid = uuid.v4(); | |
const deferred = timeoutDeferred(timeout, resolve({ errmsg: 'timeout' })); | |
workerChannel.postMessage({ uid, payload }); | |
workerChannel.once(uid, (result) => { | |
deferred.dispose(); | |
resolve(result); | |
}); | |
}); | |
} | |
request({...}).then(body => console.log(body)); |
- 简单的使用,并不比直接使用
setTimeout
简洁 timeoutDeferred
更便利的用处是在复杂逻辑流程中,基于不同的变量因子决定如何执行deferred.dispose()
3.2.4 超时处理的通用性封装:microtaskDeferred
function microtaskDeferred(fn: () => void): IScheduledLater { | |
let scheduled = true; | |
queueMicrotask(() => { | |
if (scheduled) { | |
scheduled = false; | |
fn(); | |
} | |
}); | |
return { | |
isTriggered: () => scheduled, | |
dispose: () => { scheduled = false; }, | |
}; | |
}; | |
3.3 函数防抖(debounce) 与 函数节流(throttle)
以邮政员送信为例:
- 邮局接收信件 –
letters = []
; - 邮政员送信 –
function deliver(){}
const letters = []; | |
/** 邮局接收信件 */ | |
function onLetterReceived(l) { | |
letters.push(l); | |
deliver(); // 派送策略? | |
} | |
/** 邮政员派送信件 */ | |
function deliver() { | |
const lettersToDeliver = letters; | |
letters = []; | |
return makeTheTrip(lettersToDeliver); | |
} |
- 收到信件即执行
makeTheTrip
。要求:- 收件频率低?
- 有非常多的邮政员?
- 送件速度很快?
- More…
3.3.1 lodash
: 函数防抖与节流
import { throttle, debounce } from 'lodash'; | |
// 节流:100ms 内最多执行一次 | |
const throttler = throttle(deliver, 100); | |
// 防抖:间隔 100ms 以上才触发 | |
const debounced = debounce(deliver, 100); | |
// 防抖:高频调用 - 每隔 1s 至少会触发一次 | |
const debounced = debounce(deliver, 100, { maxWait: 1000 }); |
存在的问题:
- 最佳取值:
100ms
与1000ms
如何确定? - 无法实现 CPU 最大化利用
- 无法较好的处理耗时幅度大的不确定任务调用
为什么?
- 主要原因:无法得知何时送信结束
- 解决:
callback
=> 用回回调地狱模式? - 解决:不妨试试
Promise
?
3.3.2 Promise
式的防抖与节流
Throttler
: 以节流方式执行async
回调任务
export class Throttler { | |
/** 正在执行的任务句柄 */ | |
private activePromise: Promise<unknown> | null; | |
/** 等待执行的任务句柄 */ | |
private queuedPromise: Promise<unknown> | null; | |
/** 等待执行的任务 */ | |
private queuedPromiseFactory: ITask<Promise<unknown>> | null; | |
public queue<T>(promiseFactory: ITask<Promise<T>>): Promise<T>; | |
} |
应用示例:
const throttler = new Throttler(); | |
/** 邮局接收信件 */ | |
function onLetterReceived(l) { | |
letters.push(l); | |
throttler.queue(deliver); | |
} |
- 派送策略
- 来信即送启动派送任务
- 每次派送都拿走全部的邮件
- 等待队列永远只缓存最新的一个任务 –
queuedPromiseFactory
3.3.3 Sequencer
: 顺序的执行 async 回调任务
export class Sequencer { | |
private current: Promise<unknown> = Promise.resolve(null); | |
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> { | |
return (this.current = this.current.then( | |
() => promiseTask(), | |
() => promiseTask() | |
)); | |
} | |
} |
特点:
One By One
- 与
Throttler
的不同点:等待队列(queue
)无限制 - 简单的封装,方便的调用
3.3.4 区分多类型的顺序执行 async 任务
按 key
缓存不同类型的 Sequencer
。
export class SequencerByKey<TKey> { | |
private promiseMap = new Map<TKey, Promise<unknown>>(); | |
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> { | |
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve(); | |
const newPromise = runningPromise | |
.catch(() => {}) | |
.then(promiseTask) | |
.finally(() => { | |
if (this.promiseMap.get(key) === newPromise) { | |
this.promiseMap.delete(key); | |
} | |
}); | |
this.promiseMap.set(key, newPromise); | |
return newPromise; | |
} | |
} |
3.3.5 防抖式的执行异步任务
export class Delayer<T> implements IDisposable { | |
private deferred: IScheduledLater | null; | |
private completionPromise: Promise<unknown> | null; | |
private doResolve: ((value?: unknown | Promise<unknown>) => void) | null; | |
private doReject: ((err: unknown) => void) | null; | |
private task: ITask<T | Promise<T>> | null; | |
constructor(public defaultDelay: number) {} | |
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T>; | |
isTriggered(): boolean; | |
cancel(): void; | |
} |
const delayer = new Delayer(10_000); | |
const letters = []; | |
function letterReceived(l) { | |
letters.push(l); | |
delayer.trigger(() => makeTheTrip()); | |
} |
特点:
- 延迟执行
- 有状态、可取消
- 调用方式简洁,业务逻辑清晰
- 缺点:高频调用一直被 cancel,不能及时调用
3.3.6 ThrottledDelayer
: 防抖 + 节流
邮递员很聪明,在出去送信之前,他会等待一定的时间(不会一直等待)。
export class ThrottledDelayer<T> { | |
private delayer: Delayer<Promise<T>>; | |
private throttler: Throttler; | |
constructor(defaultDelay: number); | |
trigger(promiseFactory: ITask<Promise<T>>, delay?: number): Promise<T> { | |
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>; | |
} | |
isTriggered(): boolean { return this.delayer.isTriggered(); } | |
cancel(): void { this.delayer.cancel(); } | |
dispose(): void { this.delayer.dispose(); } | |
} |
- 延迟执行高频任务:送信前等待一定的时间
- 送信时新的延时任务以防抖的方式调用:
delayer.trigger
、delayer.completionPromise
- 送信完成则立即进入下一次送信旅程:
throttler.queuedPromise
3.3.7 Barrier
:初始化前的调用屏障
创建一个初始状态为关闭、最后为永久打开的一个屏障。
export class Barrier { | |
private _isOpen: boolean = false; | |
private _promise: Promise<boolean>; | |
private _completePromise!: (v: boolean) => void; | |
constructor() { | |
this._promise = new Promise<boolean>((c, _e) => { | |
this._completePromise = c; | |
}); | |
} | |
isOpen(): boolean { return this._isOpen } | |
open(): void { | |
this._isOpen = true; | |
this._completePromise(true); | |
} | |
wait(): Promise<boolean> { return this._promise } | |
} |
问:邮政员尚未上班,如何处理触发的邮件投递任务?
示例:
const barrier = new Barrier(); | |
async function letterReceived(l) { | |
letters.push(l); | |
await barrier.wait(); // 等待就绪后调用 | |
makeTheTrip(); | |
} | |
// ... | |
barrier.open(); // 邮政员上班了 |
优点:
- 不使用创建缓冲区、等待回调等繁琐的方式
- 调用链简化,逻辑简洁清晰
3.3.8 超时自动打开屏障:AutoOpenBarrier
邮政员一直不来上班,如何自动启用备选方案(如机器人派送模式)?
export class AutoOpenBarrier extends Barrier { | |
private readonly _timeout: NodeJS.Timer; | |
constructor(autoOpenTimeMs: number) { | |
super(); | |
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs); | |
} | |
override open(): void { | |
clearTimeout(this._timeout); | |
super.open(); | |
} | |
} |
3.3.9 retry
失败重试
async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number, validator?: (r: T) => boolean): Promise<T> { | |
let lastError: Error | undefined; | |
for (let i = 0; i < retries; i++) { | |
try { | |
const result = await task(); | |
if (!validator || validator(result)) return result; | |
} catch (error) { | |
lastError = error; | |
await sleep(delay); | |
} | |
} | |
throw lastError; | |
} |
retry
应用示例:
async function doLogin(): { success: boolean } {} | |
const result = await retry(doLogin, 1_000, 3, r => r.success); |
3.4. 并发执行
3.4.1 concurrency
: 多任务并发
当已知需要执行的所有异步任务,而且他们之间的执行过程和结果互不影响时,则可以设计并发执行策略。以下为一个通用封装示例:
export function concurrency<T>(taskList: ITask<Promise<T>>[], maxDegreeOfParalellism = 5): Promise<T[]> { | |
const total = taskList.length; | |
let idx = 0; | |
const resut: T[] = []; | |
const onFinish = (r: T) => { | |
resut.push(r); | |
return next(); | |
}; | |
const next = (): Promise<void> => { | |
if (idx >= total) return null; | |
return taskList[idx++]() | |
.then(r => onFinish(r)) | |
.catch(error => onFinish(error)); | |
}; | |
const size = Math.min(maxDegreeOfParalellism, total); | |
// const queue = Array.from<Promise<void>>({ length: size }).fill(next()); | |
const queue: Promise<void>[] = []; | |
for (let i = 0; i < size; i++) queue.push(next()); | |
return Promise.allSettled(queue).then(() => resut); | |
} |
可以用如下示例测试以下效果:
function testForConcurrency(total = 30) { | |
let idx = 0; | |
const taskList = new Array(total).fill(1).map(() => (() => sleep(1000, () => console.log(`[${++idx}]done!`)))); | |
return concurrency(taskList, 6); | |
} | |
testForConcurrency(); |
3.4.2 Limiter
:可持续调用的并发队列
如果在一开始并不能确认全部的异步任务,则上面的 concurrency
并不能满足要求。通过使用下面示例中封装的 Limiter
类,可以指定最大并发数,并在任意时刻向执行队列中添加新的任务。示例:
export class Limiter<T> { | |
private _size = 0; | |
private runningPromises = 0; | |
private maxDegreeOfParalellism: number; | |
private outstandingPromises: ILimitedTaskFactory<T>[] = []; | |
private onFinishCallbackFns: ((...args: unknown[]) => unknown)[] = []; | |
constructor(maxDegreeOfParalellism: number) { | |
this.maxDegreeOfParalellism = maxDegreeOfParalellism; | |
} | |
onFinished(callback: (...args: unknown[]) => unknown) { | |
this.onFinishCallbackFns.push(callback); | |
} | |
get size(): number { | |
return this._size; | |
} | |
queue(factory: ITask<Promise<T>>): Promise<T> { | |
this._size++; | |
return new Promise<T>((c, e) => { | |
this.outstandingPromises.push({ factory, c, e }); | |
this.consume(); | |
}); | |
} | |
private consume(): void { | |
while (this.outstandingPromises.length > 0 && this.runningPromises < this.maxDegreeOfParalellism) { | |
const iLimitedTask = this.outstandingPromises.shift(); | |
this.runningPromises++; | |
const promise = iLimitedTask.factory(); | |
promise.then(iLimitedTask.c, iLimitedTask.e); | |
promise.then( | |
() => this.consumed(), | |
() => this.consumed() | |
); | |
} | |
} | |
private consumed(): void { | |
this._size--; | |
this.runningPromises--; | |
if (this.outstandingPromises.length > 0) { | |
this.consume(); | |
} else { | |
for (const d of this.onFinishCallbackFns) d(); | |
} | |
} | |
dispose(): void { | |
this.onFinishCallbackFns = []; | |
} | |
} |
4. 相关参考