JavaScript 中的异步编程方式与实践

1,256次阅读
没有评论

共计 11753 个字符,预计需要花费 30 分钟才能阅读完成。

提醒:本文最后更新于2025-07-07 14:38,文中所关联的信息可能已发生改变,请知悉!

1. 为什么要异步编程

JavaScript 是一种单线程运行的的编程语言,同一时刻只能执行一个任务。为了处理不同的任务调度逻辑,异步编程在 JavaScript 编程开发中是无法避免的。

在以下列举的场景中,均必然涉及异步编程方法:

  • IO 操作:外部设备访问
    • 文件存取
    • TCP / UDP 网络访问
  • 异步 API
    • setTimeout / setInterval
    • setImmediate
    • process.nextTick
    • queueMicrotask
    • Event

2. 实现异步编程的几种方式

  • 回调函数
  • 事件监听 / 观察者模式
  • Promise
  • Async / Await

函数可以作为函数的参数传递,这是 Javascript 支持异步编程的最基本的形式。故回调函数模式是最为广泛也几乎是早期(ES5-)唯一可选的异步编程手段。
随着 Promise 概念的引入,JavaScript 现代语法(ES6+)面向异步编程开始越来简洁友好。
自从 ES7 将基于 PromiseAsync / Await 被引入语法标准后,异步编程可以也变得如同写同步代码一样简洁清晰。

2.1 回调函数

函数可以作为函数的参数传递,这是 Javascript 支持异步编程的最基本的形式,这种特性使得 JavaScript 异步编程非常灵活而简单。回调函数模式是最为广泛也几乎是早期(ES5-)唯一可选的异步编程手段。但其让人最为诟病的是,在复杂业务逻辑中,不得不使用大量的异步回调,从而带来深度嵌套的回调地狱模式,使得逻辑异常复杂难读而不易维护。

const fs = require('fs');
fs.readFile('./a.txt', (err, data) => {
if(err) {
console.log('readFile.error:', err);
} else {
fs.readFile('./b.txt', (err1, data1) => {
if (err1) {
console.log('readFile.error:', err1);
} else {
return data + data1;
}
});
}
});

2.2 事件的监听与处理

事件的监听与处理是一种订阅者模式,本质上也都是基于回调函数的方式实现。它在基于 JavaScript 的开发中十分普遍。订阅者模式可以很容易的将关联度不高的逻辑实现分离,简化模块间依赖调用。基于事件的订阅者模式在 jQuery 时代得到了大量的应用。

但在重前端架构的大型项目中,数据依赖关系往往具有极高的复杂性,涉及多变量因子的复杂场景下,订阅者模式的数据状态具有很大不可控性,容易因为不同变量因子的逻辑微调而导致订阅依赖逻辑的混乱。因此也出现了许多面向中大型前端项目的数据流管理工具库。数据流管理本质上也仍然是基于回调函数的方式实现。

const uid = uuid.v4();
const payload = {...};
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => console.log(result));

2.3 Promise

随着 Promise 概念的引入,JavaScript 现代语法(ES6+)面向异步编程开始越来简洁友好。Promise 比回调函数好了许多,可以很好的避免回调地狱模式,不过其 then 回调的模式并不简洁。但是 Promise 为后续 ES6+ 标准中的几乎所有异步相关 API 提供了基础能力支持。

fs.promises.readFile('./a.txt', 'utf8')
.then(a => fs.promises.readFile('./b.txt', 'utf8').then(b => a + b))
.then(result => console.log(result));
.catch (err => {console.log(err));
function readFile(path) {
return new Promise((resolve, reject) => {
fs.readFile(path, (err, data) => err ? reject(err) : resolve(data));
});
}
readFile('./a.txt')
.then(a => readFile('./b.txt').then(b => a + b))
.then(result => console.log(result))
.catch(err => console.error(err));

2.4 Async / Await

自从 ES7(ES2016) 将基于 PromiseAsync / Await 被引入语法标准后,异步编程可以也变得如同写同步代码一样简洁清晰。ES13(ES2022)Top-level Await 也纳入了规范,自此 Await 可以脱离 Async 函数的局限在 ESM 模块任意位置使用。 其主要特点可概述为:

  • 语法糖:基于 Promise 的基础支持
  • 如编写同步代码般进行异步编程
try {
const a = await fs.promises.readFile('./a.txt', 'utf8');
const b = await fs.promises.readFile('./b.txt', 'utf8');
const result = a + b;
console.log(result);
} catch (err) {
console.log(err);
}

3. JavaScript 异步编程实践

  • Promise
  • 使用 Async / Await
  • 并发性能:避免 IO 阻塞
  • 防抖与节流

3.1 延迟处理

// delay 封装:callback 模式
function delay(timeout = 0, callback: () => void) {
setTimeout(() => callback(), timeout);
}
// delay 封装:Promise 模式
function delay(timeout = 0) {
return new Promise(resolve => setTimeou(() => resolve(), timeout));
}
await delay(3_00).then(() => callback());
callback();
// sleep 封装:TS 类型、回调值支持
export const sleep = <T>(timeout = 0, value?: T | (() => T | Promise<T>)): Promise<T> =>
new Promise(resolve => setTimeout(() => resolve(), timeout))
.then(() => typeof value === 'function' ? value() : value);
await sleep(3_000, 1);

3.2 事件监听与订阅的 Promise 化

function request(payload) {
return new Promise(resolve => {
const uid = uuid.v4();
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => resolve(result));
});
}
request({...}).then(body => console.log(body));

问:如何实现超时处理?

3.2.1 setTimeout 与超时处理

function request(payload, timeout = 5_000) {
return new Promise(resolve => {
const uid = uuid.v4();
const timer = setTimeout(() => resolve({ errmsg: 'timeout' }), timeout);
workerChannel.once(uid, (result) => {
clearTimeout(timer);
resolve(result);
});
workerChannel.postMessage({ uid, payload });
});
}
request({...}).then(body => console.log(body));

问:如何实现超时处理的通用性封装?

3.2.2 超时处理的通用性封装:raceTimeout

export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => T | undefined): Promise<T | undefined> {
let promiseResolve: ((value: T | undefined) => void) | undefined = undefined;
const timer = setTimeout(() => promiseResolve?.(onTimeout?.()), timeout);
return Promise.race([promise.finally(() => clearTimeout(timer)), new Promise<T | undefined>(resolve => (promiseResolve = resolve))]);
}

通过 raceTimeout 调用 request 处理超时:

raceTimeout(request({...}), 3_000, () => ({ errmsg: 'timeout' }))
.then(body => console.log(body));

使用 raceTimeout 封装通用超时处理:

function request(payload, timeout = 5_000) {
const p = new Promise(resolve => {
const uid = uuid.v4();
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => resolve(result));
});
return raceTimeout(p, timeout, () => ({ errmsg: 'timeout' }));
}
request({...}).then(body => console.log(body));

3.2.3 超时处理的通用性封装:timeoutDeferred

interface IScheduledLater extends IDisposable {
isTriggered(): boolean;
}
function timeoutDeferred(timeout: number, fn: () => void): IScheduledLater {
let scheduled = true;
const handle = setTimeout(() => {
scheduled = false;
fn();
}, timeout);
return {
isTriggered: () => scheduled,
dispose: () => {
clearTimeout(handle);
scheduled = false;
},
};
}

使用 timeoutDeferred 封装通用超时处理示例:

function request(payload, timeout = 5_000) {
const p = new Promise(resolve => {
const uid = uuid.v4();
const deferred = timeoutDeferred(timeout, resolve({ errmsg: 'timeout' }));
workerChannel.postMessage({ uid, payload });
workerChannel.once(uid, (result) => {
deferred.dispose();
resolve(result);
});
});
}
request({...}).then(body => console.log(body));
  • 简单的使用,并不比直接使用 setTimeout 简洁
  • timeoutDeferred 更便利的用处是在复杂逻辑流程中,基于不同的变量因子决定如何执行 deferred.dispose()

3.2.4 超时处理的通用性封装:microtaskDeferred

function microtaskDeferred(fn: () => void): IScheduledLater {
let scheduled = true;
queueMicrotask(() => {
if (scheduled) {
scheduled = false;
fn();
}
});
return {
isTriggered: () => scheduled,
dispose: () => { scheduled = false; },
};
};

3.3 函数防抖(debounce) 与 函数节流(throttle)

以邮政员送信为例:

  • 邮局接收信件 – letters = [];
  • 邮政员送信 – function deliver(){}
const letters = [];
/** 邮局接收信件 */
function onLetterReceived(l) {
letters.push(l);
deliver(); // 派送策略?
}
/** 邮政员派送信件 */
function deliver() {
const lettersToDeliver = letters;
letters = [];
return makeTheTrip(lettersToDeliver);
}
  • 收到信件即执行 makeTheTrip。要求:
    • 收件频率低?
    • 有非常多的邮政员?
    • 送件速度很快?
    • More…

3.3.1 lodash: 函数防抖与节流

import { throttle, debounce } from 'lodash';
// 节流:100ms 内最多执行一次
const throttler = throttle(deliver, 100);
// 防抖:间隔 100ms 以上才触发
const debounced = debounce(deliver, 100);
// 防抖:高频调用 - 每隔 1s 至少会触发一次
const debounced = debounce(deliver, 100, { maxWait: 1000 });

存在的问题:

  • 最佳取值:100ms1000ms 如何确定?
  • 无法实现 CPU 最大化利用
  • 无法较好的处理耗时幅度大的不确定任务调用

为什么?

  • 主要原因:无法得知何时送信结束
  • 解决:callback => 用回回调地狱模式?
  • 解决:不妨试试 Promise

3.3.2 Promise 式的防抖与节流

  • Throttler: 以节流方式执行 async 回调任务
export class Throttler {
/** 正在执行的任务句柄 */
private activePromise: Promise<unknown> | null;
/** 等待执行的任务句柄 */
private queuedPromise: Promise<unknown> | null;
/** 等待执行的任务 */
private queuedPromiseFactory: ITask<Promise<unknown>> | null;
public queue<T>(promiseFactory: ITask<Promise<T>>): Promise<T>;
}

应用示例:

const throttler = new Throttler();
/** 邮局接收信件 */
function onLetterReceived(l) {
letters.push(l);
throttler.queue(deliver);
}
  • 派送策略
    • 来信即送启动派送任务
    • 每次派送都拿走全部的邮件
    • 等待队列永远只缓存最新的一个任务 – queuedPromiseFactory

3.3.3 Sequencer: 顺序的执行 async 回调任务

export class Sequencer {
private current: Promise<unknown> = Promise.resolve(null);
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {
return (this.current = this.current.then(
() => promiseTask(),
() => promiseTask()
));
}
}

特点:

  • One By One
  • Throttler 的不同点:等待队列(queue)无限制
  • 简单的封装,方便的调用

3.3.4 区分多类型的顺序执行 async 任务

key 缓存不同类型的 Sequencer

export class SequencerByKey<TKey> {
private promiseMap = new Map<TKey, Promise<unknown>>();
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
const newPromise = runningPromise
.catch(() => {})
.then(promiseTask)
.finally(() => {
if (this.promiseMap.get(key) === newPromise) {
this.promiseMap.delete(key);
}
});
this.promiseMap.set(key, newPromise);
return newPromise;
}
}

3.3.5 防抖式的执行异步任务

export class Delayer<T> implements IDisposable {
private deferred: IScheduledLater | null;
private completionPromise: Promise<unknown> | null;
private doResolve: ((value?: unknown | Promise<unknown>) => void) | null;
private doReject: ((err: unknown) => void) | null;
private task: ITask<T | Promise<T>> | null;
constructor(public defaultDelay: number) {}
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T>;
isTriggered(): boolean;
cancel(): void;
}
const delayer = new Delayer(10_000);
const letters = [];
function letterReceived(l) {
letters.push(l);
delayer.trigger(() => makeTheTrip());
}

特点:

  • 延迟执行
  • 有状态、可取消
  • 调用方式简洁,业务逻辑清晰
  • 缺点:高频调用一直被 cancel,不能及时调用

3.3.6 ThrottledDelayer: 防抖 + 节流

邮递员很聪明,在出去送信之前,他会等待一定的时间(不会一直等待)。

export class ThrottledDelayer<T> {
private delayer: Delayer<Promise<T>>;
private throttler: Throttler;
constructor(defaultDelay: number);
trigger(promiseFactory: ITask<Promise<T>>, delay?: number): Promise<T> {
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
}
isTriggered(): boolean { return this.delayer.isTriggered(); }
cancel(): void { this.delayer.cancel(); }
dispose(): void { this.delayer.dispose(); }
}
  • 延迟执行高频任务:送信前等待一定的时间
  • 送信时新的延时任务以防抖的方式调用:delayer.triggerdelayer.completionPromise
  • 送信完成则立即进入下一次送信旅程:throttler.queuedPromise

3.3.7 Barrier:初始化前的调用屏障

创建一个初始状态为关闭、最后为永久打开的一个屏障。

export class Barrier {
private _isOpen: boolean = false;
private _promise: Promise<boolean>;
private _completePromise!: (v: boolean) => void;
constructor() {
this._promise = new Promise<boolean>((c, _e) => {
this._completePromise = c;
});
}
isOpen(): boolean { return this._isOpen }
open(): void {
this._isOpen = true;
this._completePromise(true);
}
wait(): Promise<boolean> { return this._promise }
}

问:邮政员尚未上班,如何处理触发的邮件投递任务?

示例:

const barrier = new Barrier();
async function letterReceived(l) {
letters.push(l);
await barrier.wait(); // 等待就绪后调用
makeTheTrip();
}
// ...
barrier.open(); // 邮政员上班了

优点:

  • 不使用创建缓冲区、等待回调等繁琐的方式
  • 调用链简化,逻辑简洁清晰

3.3.8 超时自动打开屏障:AutoOpenBarrier

邮政员一直不来上班,如何自动启用备选方案(如机器人派送模式)?

export class AutoOpenBarrier extends Barrier {
private readonly _timeout: NodeJS.Timer;
constructor(autoOpenTimeMs: number) {
super();
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);
}
override open(): void {
clearTimeout(this._timeout);
super.open();
}
}

3.3.9 retry 失败重试

async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number, validator?: (r: T) => boolean): Promise<T> {
let lastError: Error | undefined;
for (let i = 0; i < retries; i++) {
try {
const result = await task();
if (!validator || validator(result)) return result;
} catch (error) {
lastError = error;
await sleep(delay);
}
}
throw lastError;
}

retry 应用示例:

async function doLogin(): { success: boolean } {}
const result = await retry(doLogin, 1_000, 3, r => r.success);

3.4. 并发执行

3.4.1 concurrency: 多任务并发

当已知需要执行的所有异步任务,而且他们之间的执行过程和结果互不影响时,则可以设计并发执行策略。以下为一个通用封装示例:

export function concurrency<T>(taskList: ITask<Promise<T>>[], maxDegreeOfParalellism = 5): Promise<T[]> {
const total = taskList.length;
let idx = 0;
const resut: T[] = [];
const onFinish = (r: T) => {
resut.push(r);
return next();
};
const next = (): Promise<void> => {
if (idx >= total) return null;
return taskList[idx++]()
.then(r => onFinish(r))
.catch(error => onFinish(error));
};
const size = Math.min(maxDegreeOfParalellism, total);
// const queue = Array.from<Promise<void>>({ length: size }).fill(next());
const queue: Promise<void>[] = [];
for (let i = 0; i < size; i++) queue.push(next());
return Promise.allSettled(queue).then(() => resut);
}

可以用如下示例测试以下效果:

function testForConcurrency(total = 30) {
let idx = 0;
const taskList = new Array(total).fill(1).map(() => (() => sleep(1000, () => console.log(`[${++idx}]done!`))));
return concurrency(taskList, 6);
}
testForConcurrency();

3.4.2 Limiter:可持续调用的并发队列

如果在一开始并不能确认全部的异步任务,则上面的 concurrency 并不能满足要求。通过使用下面示例中封装的 Limiter 类,可以指定最大并发数,并在任意时刻向执行队列中添加新的任务。示例:

export class Limiter<T> {
private _size = 0;
private runningPromises = 0;
private maxDegreeOfParalellism: number;
private outstandingPromises: ILimitedTaskFactory<T>[] = [];
private onFinishCallbackFns: ((...args: unknown[]) => unknown)[] = [];
constructor(maxDegreeOfParalellism: number) {
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
}
onFinished(callback: (...args: unknown[]) => unknown) {
this.onFinishCallbackFns.push(callback);
}
get size(): number {
return this._size;
}
queue(factory: ITask<Promise<T>>): Promise<T> {
this._size++;
return new Promise<T>((c, e) => {
this.outstandingPromises.push({ factory, c, e });
this.consume();
});
}
private consume(): void {
while (this.outstandingPromises.length > 0 && this.runningPromises < this.maxDegreeOfParalellism) {
const iLimitedTask = this.outstandingPromises.shift();
this.runningPromises++;
const promise = iLimitedTask.factory();
promise.then(iLimitedTask.c, iLimitedTask.e);
promise.then(
() => this.consumed(),
() => this.consumed()
);
}
}
private consumed(): void {
this._size--;
this.runningPromises--;
if (this.outstandingPromises.length > 0) {
this.consume();
} else {
for (const d of this.onFinishCallbackFns) d();
}
}
dispose(): void {
this.onFinishCallbackFns = [];
}
}

4. 相关参考

正文完
 0
任侠
版权声明:本站原创文章,由 任侠 于2022-03-24发表,共计11753字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码