Skip to content

@ahoo-wang/fetcher-openai

@ahoo-wang/fetcher-openai 包为 OpenAI 的 Chat Completions API 提供类型安全的客户端。它将 decorator 包的声明式 API 风格与 eventstream 包的 SSE 处理相结合,通过单个方法调用即可实现无缝的流式和非流式体验。

源码: packages/openai/src/

安装

bash
pnpm add @ahoo-wang/fetcher-openai

对等依赖

此包需要其全部三个对等依赖:

bash
pnpm add @ahoo-wang/fetcher @ahoo-wang/fetcher-eventstream @ahoo-wang/fetcher-decorator reflect-metadata

架构

mermaid
graph TB
    subgraph sg_1 ["Application"]
        APP["Client Code"]
    end

    subgraph sg_2 ["@ahoo-wang/fetcher-openai"]
        OAI["OpenAI<br>Top-level client"]
        CC["ChatClient<br>Chat Completions API"]
        CRE["CompletionStreamResultExtractor<br>SSE result extractor"]
        DONE["DoneDetector<br>[DONE] terminate signal"]
        TYPES["ChatRequest / ChatResponse<br>Type-safe types"]
    end

    subgraph sg_3 ["@ahoo-wang/fetcher-decorator"]
        API["@api('chat')"]
        POST["@post('/completions')"]
        BODY["@body()"]
        LIFECYCLE["ExecuteLifeCycle<br>beforeExecute"]
    end

    subgraph sg_4 ["@ahoo-wang/fetcher-eventstream"]
        SSE["Response.prototype<br>.requiredJsonEventStream()"]
        TD["TerminateDetector"]
    end

    subgraph sg_5 ["@ahoo-wang/fetcher"]
        FETCHER["Fetcher<br>HTTP client"]
        IR["Interceptor Pipeline"]
    end

    APP --> OAI
    OAI --> CC
    OAI --> FETCHER
    CC --> API
    CC --> POST
    CC --> BODY
    CC --> LIFECYCLE
    LIFECYCLE --> CRE
    CRE --> DONE
    CRE --> SSE
    DONE --> TD
    CC --> FETCHER
    FETCHER --> IR

    style APP fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style OAI fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CC fill:#161b22,stroke:#30363d,color:#e6edf3
    style CRE fill:#161b22,stroke:#30363d,color:#e6edf3
    style DONE fill:#161b22,stroke:#30363d,color:#e6edf3
    style TYPES fill:#161b22,stroke:#30363d,color:#e6edf3
    style API fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style POST fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style BODY fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style LIFECYCLE fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SSE fill:#161b22,stroke:#30363d,color:#e6edf3
    style TD fill:#161b22,stroke:#30363d,color:#e6edf3
    style FETCHER fill:#161b22,stroke:#30363d,color:#e6edf3
    style IR fill:#161b22,stroke:#30363d,color:#e6edf3

快速开始

非流式

typescript
import 'reflect-metadata';
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

const response = await openai.chat.completions({
  model: 'gpt-3.5-turbo',
  messages: [
    { role: 'system', content: 'You are a helpful assistant.' },
    { role: 'user', content: 'What is TypeScript?' },
  ],
  temperature: 0.7,
  max_tokens: 150,
});

console.log(response.choices[0].message?.content);
// => "TypeScript is a programming language developed by Microsoft..."
console.log(response.usage.total_tokens);
// => 42

流式

typescript
import 'reflect-metadata';
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

const stream = await openai.chat.completions({
  model: 'gpt-4',
  messages: [{ role: 'user', content: 'Write a short story about a cat.' }],
  stream: true,
  temperature: 0.8,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) {
    process.stdout.write(content); // 逐 token 输出
  }
}
// 收到 [DONE] 信号时流自动终止

流式与非流式流程

ChatClient 使用高级 TypeScript 条件类型,根据 stream 参数提供正确的返回类型。beforeExecute 生命周期钩子动态切换结果提取器。(chatClient.ts:78)

mermaid
sequenceDiagram
autonumber

    participant App as Application
    participant CC as ChatClient
    participant LE as beforeExecute
    participant Fetcher as Fetcher
    participant API as OpenAI API

    App->>CC: completions({stream: false})
    CC->>LE: beforeExecute(exchange)
    LE->>LE: stream=false => use default JSON extractor
    LE-->>CC: exchange unchanged
    CC->>Fetcher: POST /chat/completions
    Fetcher->>API: HTTP request
    API-->>Fetcher: 200 JSON response
    Fetcher-->>CC: ChatResponse
    CC-->>App: ChatResponse

    App->>CC: completions({stream: true})
    CC->>LE: beforeExecute(exchange)
    LE->>LE: stream=true => set CompletionStreamResultExtractor
    LE-->>CC: exchange updated
    CC->>Fetcher: POST /chat/completions
    Fetcher->>API: HTTP request
    API-->>Fetcher: 200 text/event-stream
    Fetcher-->>CC: JsonServerSentEventStream
    CC-->>App: `JsonServerSentEventStream<ChatResponse>`

    loop For each SSE chunk
        API-->>App: data: {"choices":[{"delta":{"content":"token"}}]}
    end

    API-->>App: data: [DONE]
    Note over App: Stream auto-terminates

OpenAI 客户端

OpenAI 类是顶层入口点。它创建一个带有 Authorization 头中 API 密钥的 Fetcher,并实例化子客户端。(openai.ts:63)

typescript
const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: 'sk-...',
});

// 访问底层 fetcher 以进行自定义配置
openai.fetcher.interceptors.request.use(myCustomInterceptor);

// 使用聊天客户端
await openai.chat.completions({ model: 'gpt-4', messages: [...] });

OpenAIOptions

属性类型必填描述
baseURLstringOpenAI API 基础 URL(如 https://api.openai.com/v1
apiKeystringOpenAI API 密钥(作为 Bearer 令牌发送)

ChatClient

ChatClient 使用 @api('chat') 装饰,同时实现了 ApiMetadataCapable(用于运行时元数据注入)和 ExecuteLifeCycle(用于动态结果提取器切换)。(chatClient.ts:78)

typescript
@api('chat')
export class ChatClient implements ApiMetadataCapable, ExecuteLifeCycle {
  constructor(public readonly apiMetadata?: ApiMetadata) {}

  beforeExecute(exchange: FetchExchange): void {
    const chatRequest = exchange.request.body as ChatRequest;
    if (chatRequest.stream) {
      exchange.resultExtractor = CompletionStreamResultExtractor;
    }
  }

  @post('/completions')
  completions<T extends ChatRequest>(
    @body() chatRequest: T,
  ): Promise<
    T['stream'] extends true
      ? JsonServerSentEventStream<ChatResponse>
      : ChatResponse
  > {
    throw autoGeneratedError(chatRequest);
  }
}

条件返回类型 T['stream'] extends true ? JsonServerSentEventStream<ChatResponse> : ChatResponse 确保 TypeScript 在调用处正确推断返回类型。

CompletionStreamResultExtractor

CompletionStreamResultExtractor 处理来自聊天补全 API 的 SSE 响应。它使用 DoneDetector 在接收到 [DONE] 信号时终止流。(completionStreamResultExtractor.ts:88)

mermaid
flowchart TD
    RESP["Response from OpenAI API"]
    RESP --> CHECK{"Content-Type is<br>text/event-stream?"}
    CHECK -->|No| ERROR["EventStreamConvertError"]
    CHECK -->|Yes| STREAM["requiredJsonEventStream(DoneDetector)"]
    STREAM --> PARSE["Parse each SSE event as JSON"]
    PARSE --> DETECT{"data === '[DONE]'?"}
    DETECT -->|Yes| TERM["controller.terminate()<br>Stream ends"]
    DETECT -->|No| EMIT["Emit `JsonServerSentEvent<ChatResponse>`"]

    style RESP fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CHECK fill:#161b22,stroke:#30363d,color:#e6edf3
    style ERROR fill:#161b22,stroke:#30363d,color:#e6edf3
    style STREAM fill:#161b22,stroke:#30363d,color:#e6edf3
    style PARSE fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style DETECT fill:#161b22,stroke:#30363d,color:#e6edf3
    style TERM fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style EMIT fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
typescript
export const DoneDetector: TerminateDetector = (event) => {
  return event.data === '[DONE]';
};

export const CompletionStreamResultExtractor: ResultExtractor<
  JsonServerSentEventStream<ChatResponse>
> = (exchange) => {
  return exchange.requiredResponse.requiredJsonEventStream(DoneDetector);
};

类型定义

ChatRequest

聊天补全端点的完整请求体。(types.ts:14)

属性类型默认值描述
modelstring-模型 ID(如 gpt-3.5-turbogpt-4
messagesMessage[]-包含角色和内容的对话消息
streambooleanfalse启用流式响应
temperaturenumber1采样温度(0-2)
max_tokensnumberinf最大生成 token 数
top_pnumber1核采样阈值
frequency_penaltynumber0重复惩罚(-2.0 到 2.0)
presence_penaltynumber0主题多样性惩罚(-2.0 到 2.0)
stopstringnull停止序列
nnumber1生成的补全数量
userstring-终端用户标识符

Message

属性类型描述
rolestring"system""user""assistant"
contentstring?消息文本内容

ChatResponse

属性类型描述
idstring唯一响应 ID
objectstring对象类型(如 "chat.completion"
creatednumber创建时间的 Unix 时间戳
choicesChoice[]补全选项数组
usageUsagetoken 使用统计

Choice

属性类型描述
indexnumber?选项索引
messageMessage?补全消息(非流式)
finish_reasonstring?"stop""length""content_filter"

Usage

属性类型描述
prompt_tokensnumber提示中的 token 数
completion_tokensnumber补全中的 token 数
total_tokensnumber消耗的 token 总数

进阶:自定义拦截器

由于 OpenAI 类暴露了其 Fetcher 实例,您可以添加拦截器用于日志记录、重试或认证刷新:

typescript
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

// 添加日志拦截器
openai.fetcher.interceptors.request.use({
  name: 'RequestLogger',
  order: 100,
  intercept(exchange) {
    console.log(`[OpenAI] ${exchange.request.method} ${exchange.request.url}`);
  },
});

// 添加限流重试拦截器
openai.fetcher.interceptors.error.use({
  name: 'RateLimitRetry',
  order: 100,
  async intercept(exchange) {
    if (exchange.error?.response?.status === 429) {
      const retryAfter = parseInt(
        exchange.error.response.headers.get('Retry-After') || '1',
      );
      await new Promise(r => setTimeout(r, retryAfter * 1000));
      // 重试请求
      const response = await fetch(exchange.request);
      exchange.response = response;
      exchange.error = undefined;
    }
  },
});

导出 API 总结

导出类型源码
OpenAIopenai.ts
OpenAIOptions接口openai.ts
ChatClientchat/chatClient.ts
ChatRequest接口chat/types.ts
ChatResponse接口chat/types.ts
Message接口chat/types.ts
Choice接口chat/types.ts
Usage接口chat/types.ts
CompletionStreamResultExtractor函数chat/completionStreamResultExtractor.ts
DoneDetector函数chat/completionStreamResultExtractor.ts

相关页面

基于 Apache License 2.0 发布。