@ahoo-wang/fetcher-wow
@ahoo-wang/fetcher-wow 包为 Wow DDD + 事件溯源 + CQRS 框架提供客户端集成层。它提供了用于发送领域命令的类型化命令客户端、用于读取聚合状态的快照查询客户端、用于重放领域事件的事件流查询客户端,以及支持条件查询、排序和分页的丰富查询 DSL。
安装
bash
pnpm add @ahoo-wang/fetcher-wow架构概览
mermaid
graph TB
subgraph sg_1 ["Command Side (Write)"]
CC["CommandClient<br>send commands"]
CSC["StreamCommandClient<br>send + SSE stream"]
end
subgraph sg_2 ["Query Side (Read)"]
SQC["SnapshotQueryClient<S><br>query snapshots"]
ESQC["EventStreamQueryClient<br>query domain events"]
LSAC["LoadStateAggregateClient<br>load by ID"]
LOSAC["LoadOwnerStateAggregateClient<br>load owner state"]
end
subgraph sg_3 ["Query DSL"]
COND["Condition<br>where / and / or"]
SORT["FieldSort<br>sort by field"]
PAGE["PagedQuery / ListQuery<br>pagination"]
OP["Operator<br>EQ, NE, IN, BETWEEN..."]
end
subgraph sg_4 ["Factories"]
QCF["QueryClientFactory<S, FIELDS><br>creates all query clients"]
end
QCF --> SQC
QCF --> ESQC
QCF --> LSAC
QCF --> LOSAC
CC --> |"POST command"| API["Wow Server API"]
SQC --> |"POST query"| API
ESQC --> |"POST + SSE"| API
LSAC --> |"GET by ID"| API
SQC --> COND
SQC --> SORT
SQC --> PAGE
COND --> OP
style CC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style CSC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style SQC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style ESQC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style LSAC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style LOSAC fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style QCF fill:#161b22,stroke:#30363d,color:#e6edf3
style COND fill:#161b22,stroke:#30363d,color:#e6edf3
style SORT fill:#161b22,stroke:#30363d,color:#e6edf3
style PAGE fill:#161b22,stroke:#30363d,color:#e6edf3
style OP fill:#161b22,stroke:#30363d,color:#e6edf3
style API fill:#2d333b,stroke:#6d5dfc,color:#e6edf3命令端(写模型)
CommandClient
CommandClient 使用基于装饰器的 API 方法向 Wow 聚合根发送命令。支持标准命令执行和长时间运行命令的 SSE 流式传输。
typescript
import { CommandClient } from '@ahoo-wang/fetcher-wow';
import { ApiMetadata } from '@ahoo-wang/fetcher-decorator';
import { Fetcher } from '@ahoo-wang/fetcher';
const commandClient = new CommandClient({
fetcher: new Fetcher({ baseURL: 'http://localhost:8080/' }),
basePath: 'owner/{ownerId}/cart',
});
// Send a command and wait for result
const result = await commandClient.send({
body: {
productId: 'product-1',
quantity: 2,
},
headers: {
'Wow-Wait-Stage': 'SNAPSHOT',
},
});
console.log('Aggregate ID:', result.aggregateId);
console.log('Command ID:', result.commandId);来源: packages/wow/src/command/commandClient.ts:77-148
CommandRequest
命令包装在 CommandRequest 中,支持:
body-- 使用CommandBody<C>包装的命令载荷headers-- 类型化命令头,用于等待策略、租户/所有者/聚合标识urlParams-- 聚合路由的路径参数
命令头
| 请求头 | 常量 | 描述 |
|---|---|---|
Wow-Tenant-Id | CommandHeaders.TENANT_ID | 租户标识符 |
Wow-Owner-Id | CommandHeaders.OWNER_ID | 所有者标识符 |
Wow-Aggregate-Id | CommandHeaders.AGGREGATE_ID | 聚合实例 ID |
Wow-Aggregate-Version | CommandHeaders.AGGREGATE_VERSION | 预期聚合版本 |
Wow-Wait-Stage | CommandHeaders.WAIT_STAGE | 等待处理阶段 |
Wow-Wait-Time-Out | CommandHeaders.WAIT_TIME_OUT | 等待超时时长 |
Wow-Request-Id | CommandHeaders.REQUEST_ID | 请求关联 ID |
来源: packages/wow/src/command/commandHeaders.ts
CommandResult
命令执行后返回的结果:
mermaid
classDiagram
class CommandResult {
+id: string
+waitCommandId: string
+stage: CommandStage
+contextAlias: string
+contextName: string
+aggregateName: string
+aggregateId: string
+aggregateVersion: number
+commandId: string
+requestId: string
+errorCode: string
+errorMsg: string
+signalTime: number
+result: any
}
class CommandResultEventStream {
<<type>>
ReadableStream~JsonServerSentEvent~CommandResult~~
}
CommandResult --> CommandResultEventStream
style CommandResult fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style CommandResultEventStream fill:#161b22,stroke:#30363d,color:#e6edf3来源: packages/wow/src/command/commandResult.ts:74-110
命令流程
mermaid
sequenceDiagram
autonumber
participant C as Client
participant CC as CommandClient
participant S as Wow Server
participant AGG as Aggregate Root
C->>CC: send('add_cart_item', { body: {...} })
CC->>S: POST /owner/{ownerId}/cart/add_cart_item
S->>AGG: Handle command
AGG-->>S: Domain events produced
S-->>CC: CommandResult (with wait stage)
CC-->>C: CommandResult
Note over C,AGG: For streaming:<br>sendAndWaitStream returns<br>ReadableStream of CommandResult events查询端(读模型)
SnapshotQueryClient
读取聚合状态的主要客户端。支持计数、列表、分页和流式快照查询。
typescript
import { SnapshotQueryClient, all, listQuery, pagedQuery, singleQuery } from '@ahoo-wang/fetcher-wow';
const client = new SnapshotQueryClient<CartState>(apiMetadata);
// Count
const count = await client.count(all());
// List
const items = await client.list(listQuery({
condition: all(),
limit: 100,
}));
// Paged
const page = await client.paged(pagedQuery({
condition: all(),
limit: 10,
offset: 0,
}));
// Single by ID
const cart = await client.getStateById('cart-123');
// Multiple by IDs
const carts = await client.getStateByIds(['cart-1', 'cart-2']);SnapshotQueryClient 方法
| 方法 | 端点 | 返回类型 | 描述 |
|---|---|---|---|
count(condition) | /snapshot/count | Promise<number> | 统计匹配的聚合数量 |
list(listQuery) | /snapshot/list | Promise<MaterializedSnapshot<S>[]> | 列表查询快照 |
listStream(listQuery) | /snapshot/list | Promise<ReadableStream<SSE>> | 以 SSE 流形式列出快照 |
listState(listQuery) | /snapshot/list_state | Promise<S[]> | 仅列出状态 |
listStateStream(listQuery) | /snapshot/list_state | Promise<ReadableStream<SSE>> | 以 SSE 流形式列出状态 |
paged(pagedQuery) | /snapshot/paged | Promise<PagedList<S>> | 分页查询快照 |
pagedState(pagedQuery) | /snapshot/paged_state | Promise<PagedList<S>> | 分页查询状态 |
single(singleQuery) | /snapshot/single | Promise<MaterializedSnapshot<S>> | 单个快照查询 |
singleState(singleQuery) | /snapshot/single_state | Promise<S> | 单个状态查询 |
getById(id) | -- | Promise<MaterializedSnapshot<S>> | 通过聚合 ID 获取 |
getStateById(id) | -- | Promise<S> | 通过 ID 获取状态 |
getByIds(ids) | -- | Promise<MaterializedSnapshot<S>[]> | 通过多个 ID 获取 |
getStateByIds(ids) | -- | Promise<S[]> | 通过多个 ID 获取状态 |
来源: packages/wow/src/query/snapshot/snapshotQueryClient.ts:119-516
QueryClientFactory
为给定聚合创建所有查询客户端的工厂,预配置了正确的基本路径:
typescript
import { QueryClientFactory, ResourceAttributionPathSpec } from '@ahoo-wang/fetcher-wow';
const factory = new QueryClientFactory<CartState, CartFields, CartDomainEvent>({
contextAlias: 'example',
aggregateName: 'cart',
resourceAttribution: ResourceAttributionPathSpec.OWNER,
});
// Create individual clients
const snapshotClient = factory.createSnapshotQueryClient();
const stateClient = factory.createLoadStateAggregateClient();
const ownerStateClient = factory.createOwnerLoadStateAggregateClient();
const eventClient = factory.createEventStreamQueryClient();| 工厂方法 | 创建的客户端 | 描述 |
|---|---|---|
createSnapshotQueryClient() | SnapshotQueryClient<S, FIELDS> | 带条件的快照查询 |
createLoadStateAggregateClient() | LoadStateAggregateClient<S> | 通过 ID、版本或时间加载 |
createOwnerLoadStateAggregateClient() | LoadOwnerStateAggregateClient<S> | 加载所有者的聚合状态 |
createEventStreamQueryClient() | EventStreamQueryClient | 领域事件流查询 |
来源: packages/wow/src/query/queryClients.ts:62-214
查询 DSL
条件查询
条件系统支持构建复杂的查询谓词:
typescript
import { all, condition, aggregateId, aggregateIds } from '@ahoo-wang/fetcher-wow';
// All records
const allCondition = all();
// By aggregate ID
const byId = aggregateId('cart-123');
// By multiple IDs
const byIds = aggregateIds(['cart-1', 'cart-2', 'cart-3']);
// Complex conditions with operators
const complex = condition({
field: 'status',
operator: 'IN',
value: ['ACTIVE', 'PENDING'],
}).and({
field: 'createdAt',
operator: 'BETWEEN',
value: ['2024-01-01', '2024-12-31'],
});运算符
| 运算符 | 描述 | 示例 |
|---|---|---|
EQ | 等于 | { field: 'name', operator: 'EQ', value: 'Alice' } |
NE | 不等于 | { field: 'status', operator: 'NE', value: 'DELETED' } |
IN | 在集合中 | { field: 'type', operator: 'IN', value: ['A', 'B'] } |
NOT_IN | 不在集合中 | { field: 'type', operator: 'NOT_IN', value: ['C'] } |
BETWEEN | 范围 | { field: 'age', operator: 'BETWEEN', value: [18, 65] } |
LIKE | 模式匹配 | { field: 'name', operator: 'LIKE', value: '%john%' } |
GT | 大于 | { field: 'price', operator: 'GT', value: 100 } |
LT | 小于 | { field: 'price', operator: 'LT', value: 50 } |
ALL | 匹配全部 | 无需字段/值 |
来源: packages/wow/src/query/operator.ts
排序和分页
typescript
import { pagedQuery, listQuery } from '@ahoo-wang/fetcher-wow';
// Paged query with sorting
const query = pagedQuery({
condition: all(),
limit: 20,
offset: 0,
sort: [{ field: 'createdAt', order: 'DESC' }],
});模块结构
mermaid
graph TB
subgraph sg_1 ["@ahoo-wang/fetcher-wow"]
direction TB
CMD["command/<br>CommandClient, headers, types"]
QRY["query/<br>Query DSL, conditions, operators"]
SNAP["query/snapshot/<br>SnapshotQueryClient"]
EVNT["query/event/<br>EventStreamQueryClient"]
STATE["query/state/<br>LoadStateAggregateClient"]
CFG["configuration/<br>wowMetadata"]
TYPES["types/<br>DDD modeling types"]
end
CMD --> QRY
QRY --> SNAP
QRY --> EVNT
QRY --> STATE
TYPES --> CMD
TYPES --> QRY
CFG --> CMD
style CMD fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style QRY fill:#161b22,stroke:#30363d,color:#e6edf3
style SNAP fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style EVNT fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style STATE fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style CFG fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
style TYPES fill:#2d333b,stroke:#6d5dfc,color:#e6edf3主要导出
| 导出 | 模块 | 描述 |
|---|---|---|
CommandClient | command/ | 基于装饰器的命令发送客户端 |
CommandRequest | command/ | 带请求头的类型化命令请求 |
CommandResult | command/ | 命令执行结果 |
CommandResultEventStream | command/ | 命令结果的 SSE 流 |
CommandBody<C> | command/ | 命令体包装类型 |
CommandHeaders | command/ | 请求头名称常量 |
QueryClientFactory | query/ | 创建所有查询客户端的工厂 |
QueryClientOptions | query/ | 查询客户端配置 |
SnapshotQueryClient | query/snapshot/ | 快照查询操作 |
EventStreamQueryClient | query/event/ | 领域事件流查询 |
LoadStateAggregateClient | query/state/ | 通过 ID/版本/时间加载聚合状态 |
LoadOwnerStateAggregateClient | query/state/ | 加载所有者的聚合状态 |
Condition | query/ | 查询条件构建器 |
all() | query/ | 匹配所有记录的条件 |
aggregateId(id) | query/ | 匹配单个聚合 ID 的条件 |
aggregateIds(ids) | query/ | 匹配多个聚合 ID 的条件 |
condition(field, operator, value) | query/ | 条件构建器 |
listQuery() | query/ | 创建列表查询 |
pagedQuery() | query/ | 创建分页查询 |
singleQuery() | query/ | 创建单条查询 |
FieldSort | query/ | 排序规范 |
Operator | query/ | 查询运算符枚举 |
ResourceAttributionPathSpec | types/ | 租户/所有者范围的路径规范 |
生成的客户端
Generator 包会自动为 OpenAPI 规范中发现的每个聚合生成类型化的命令和查询客户端。例如,对于限界上下文 example 中的 Cart 聚合:
typescript
// Generated command client
const commandClient = new CartCommandClient();
const result = await commandClient.addCartItem({
body: { productId: 'p1', quantity: 1 },
});
// Generated query client factory
const factory = cartQueryClientFactory;
const snapshotClient = factory.createSnapshotQueryClient();
const cartState = await snapshotClient.singleState(singleQuery({
condition: aggregateId('cart-1'),
}));交叉引用
- Fetcher -- 核心 HTTP 客户端;所有 Wow 客户端使用 Fetcher 进行 HTTP 传输
- Decorator --
CommandClient和SnapshotQueryClient使用@api、@post、@body装饰器 - EventStream -- 流式查询(
listStream、sendAndWaitStream)使用JsonEventStreamResultExtractor - Generator -- 生成器读取 OpenAPI 规范并生成类型化的 Wow 客户端
- React --
useSingleQuery、useListQuery、usePagedQueryHook 面向 Wow 查询客户端 - Viewer -- FetcherViewer 组件使用 Wow 查询客户端进行数据展示