Skip to content

@ahoo-wang/fetcher-openai

The @ahoo-wang/fetcher-openai package provides a type-safe client for OpenAI's Chat Completions API. It combines the decorator package's declarative API style with the eventstream package's SSE processing to deliver a seamless streaming and non-streaming experience from a single method call.

Source: packages/openai/src/

Installation

bash
pnpm add @ahoo-wang/fetcher-openai

Peer Dependencies

This package requires all three of its peer dependencies:

bash
pnpm add @ahoo-wang/fetcher @ahoo-wang/fetcher-eventstream @ahoo-wang/fetcher-decorator reflect-metadata

Architecture

mermaid
graph TB
    subgraph sg_1 ["Application"]
        APP["Client Code"]
    end

    subgraph sg_2 ["@ahoo-wang/fetcher-openai"]
        OAI["OpenAI<br>Top-level client"]
        CC["ChatClient<br>Chat Completions API"]
        CRE["CompletionStreamResultExtractor<br>SSE result extractor"]
        DONE["DoneDetector<br>[DONE] terminate signal"]
        TYPES["ChatRequest / ChatResponse<br>Type-safe types"]
    end

    subgraph sg_3 ["@ahoo-wang/fetcher-decorator"]
        API["@api('chat')"]
        POST["@post('/completions')"]
        BODY["@body()"]
        LIFECYCLE["ExecuteLifeCycle<br>beforeExecute"]
    end

    subgraph sg_4 ["@ahoo-wang/fetcher-eventstream"]
        SSE["Response.prototype<br>.requiredJsonEventStream()"]
        TD["TerminateDetector"]
    end

    subgraph sg_5 ["@ahoo-wang/fetcher"]
        FETCHER["Fetcher<br>HTTP client"]
        IR["Interceptor Pipeline"]
    end

    APP --> OAI
    OAI --> CC
    OAI --> FETCHER
    CC --> API
    CC --> POST
    CC --> BODY
    CC --> LIFECYCLE
    LIFECYCLE --> CRE
    CRE --> DONE
    CRE --> SSE
    DONE --> TD
    CC --> FETCHER
    FETCHER --> IR

    style APP fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style OAI fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CC fill:#161b22,stroke:#30363d,color:#e6edf3
    style CRE fill:#161b22,stroke:#30363d,color:#e6edf3
    style DONE fill:#161b22,stroke:#30363d,color:#e6edf3
    style TYPES fill:#161b22,stroke:#30363d,color:#e6edf3
    style API fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style POST fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style BODY fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style LIFECYCLE fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style SSE fill:#161b22,stroke:#30363d,color:#e6edf3
    style TD fill:#161b22,stroke:#30363d,color:#e6edf3
    style FETCHER fill:#161b22,stroke:#30363d,color:#e6edf3
    style IR fill:#161b22,stroke:#30363d,color:#e6edf3

Quick Start

Non-Streaming

typescript
import 'reflect-metadata';
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

const response = await openai.chat.completions({
  model: 'gpt-3.5-turbo',
  messages: [
    { role: 'system', content: 'You are a helpful assistant.' },
    { role: 'user', content: 'What is TypeScript?' },
  ],
  temperature: 0.7,
  max_tokens: 150,
});

console.log(response.choices[0].message?.content);
// => "TypeScript is a programming language developed by Microsoft..."
console.log(response.usage.total_tokens);
// => 42

Streaming

typescript
import 'reflect-metadata';
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

const stream = await openai.chat.completions({
  model: 'gpt-4',
  messages: [{ role: 'user', content: 'Write a short story about a cat.' }],
  stream: true,
  temperature: 0.8,
});

for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) {
    process.stdout.write(content); // Token-by-token output
  }
}
// Stream terminates automatically when [DONE] is received

Streaming vs Non-Streaming Flow

The ChatClient uses advanced TypeScript conditional types to provide the correct return type based on the stream parameter. The beforeExecute lifecycle hook dynamically switches the result extractor. (chatClient.ts:78)

mermaid
sequenceDiagram
autonumber

    participant App as Application
    participant CC as ChatClient
    participant LE as beforeExecute
    participant Fetcher as Fetcher
    participant API as OpenAI API

    App->>CC: completions({stream: false})
    CC->>LE: beforeExecute(exchange)
    LE->>LE: stream=false => use default JSON extractor
    LE-->>CC: exchange unchanged
    CC->>Fetcher: POST /chat/completions
    Fetcher->>API: HTTP request
    API-->>Fetcher: 200 JSON response
    Fetcher-->>CC: ChatResponse
    CC-->>App: ChatResponse

    App->>CC: completions({stream: true})
    CC->>LE: beforeExecute(exchange)
    LE->>LE: stream=true => set CompletionStreamResultExtractor
    LE-->>CC: exchange updated
    CC->>Fetcher: POST /chat/completions
    Fetcher->>API: HTTP request
    API-->>Fetcher: 200 text/event-stream
    Fetcher-->>CC: JsonServerSentEventStream
    CC-->>App: `JsonServerSentEventStream<ChatResponse>`

    loop For each SSE chunk
        API-->>App: data: {"choices":[{"delta":{"content":"token"}}]}
    end

    API-->>App: data: [DONE]
    Note over App: Stream auto-terminates

OpenAI Client

The OpenAI class is the top-level entry point. It creates a Fetcher with the API key in the Authorization header and instantiates sub-clients. (openai.ts:63)

typescript
const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: 'sk-...',
});

// Access the underlying fetcher for custom configuration
openai.fetcher.interceptors.request.use(myCustomInterceptor);

// Use the chat client
await openai.chat.completions({ model: 'gpt-4', messages: [...] });

OpenAIOptions

PropertyTypeRequiredDescription
baseURLstringYesOpenAI API base URL (e.g., https://api.openai.com/v1)
apiKeystringYesOpenAI API key (sent as Bearer token)

ChatClient

The ChatClient is decorated with @api('chat') and implements both ApiMetadataCapable (for runtime metadata injection) and ExecuteLifeCycle (for dynamic result extractor switching). (chatClient.ts:78)

typescript
@api('chat')
export class ChatClient implements ApiMetadataCapable, ExecuteLifeCycle {
  constructor(public readonly apiMetadata?: ApiMetadata) {}

  beforeExecute(exchange: FetchExchange): void {
    const chatRequest = exchange.request.body as ChatRequest;
    if (chatRequest.stream) {
      exchange.resultExtractor = CompletionStreamResultExtractor;
    }
  }

  @post('/completions')
  completions<T extends ChatRequest>(
    @body() chatRequest: T,
  ): Promise<
    T['stream'] extends true
      ? JsonServerSentEventStream<ChatResponse>
      : ChatResponse
  > {
    throw autoGeneratedError(chatRequest);
  }
}

The conditional return type T['stream'] extends true ? JsonServerSentEventStream<ChatResponse> : ChatResponse ensures TypeScript correctly infers the return type at call sites.

CompletionStreamResultExtractor

The CompletionStreamResultExtractor processes SSE responses from the chat completions API. It uses the DoneDetector to terminate the stream when the [DONE] signal is received. (completionStreamResultExtractor.ts:88)

mermaid
flowchart TD
    RESP["Response from OpenAI API"]
    RESP --> CHECK{"Content-Type is<br>text/event-stream?"}
    CHECK -->|No| ERROR["EventStreamConvertError"]
    CHECK -->|Yes| STREAM["requiredJsonEventStream(DoneDetector)"]
    STREAM --> PARSE["Parse each SSE event as JSON"]
    PARSE --> DETECT{"data === '[DONE]'?"}
    DETECT -->|Yes| TERM["controller.terminate()<br>Stream ends"]
    DETECT -->|No| EMIT["Emit `JsonServerSentEvent<ChatResponse>`"]

    style RESP fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style CHECK fill:#161b22,stroke:#30363d,color:#e6edf3
    style ERROR fill:#161b22,stroke:#30363d,color:#e6edf3
    style STREAM fill:#161b22,stroke:#30363d,color:#e6edf3
    style PARSE fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style DETECT fill:#161b22,stroke:#30363d,color:#e6edf3
    style TERM fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style EMIT fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
typescript
export const DoneDetector: TerminateDetector = (event) => {
  return event.data === '[DONE]';
};

export const CompletionStreamResultExtractor: ResultExtractor<
  JsonServerSentEventStream<ChatResponse>
> = (exchange) => {
  return exchange.requiredResponse.requiredJsonEventStream(DoneDetector);
};

Type Definitions

ChatRequest

Full request body for the chat completions endpoint. (types.ts:14)

PropertyTypeDefaultDescription
modelstring-Model ID (e.g., gpt-3.5-turbo, gpt-4)
messagesMessage[]-Conversation messages with role and content
streambooleanfalseEnable streaming responses
temperaturenumber1Sampling temperature (0-2)
max_tokensnumberinfMaximum tokens to generate
top_pnumber1Nucleus sampling threshold
frequency_penaltynumber0Repetition penalty (-2.0 to 2.0)
presence_penaltynumber0Topic diversity penalty (-2.0 to 2.0)
stopstringnullStop sequences
nnumber1Number of completions to generate
userstring-End-user identifier

Message

PropertyTypeDescription
rolestring"system", "user", or "assistant"
contentstring?Message text content

ChatResponse

PropertyTypeDescription
idstringUnique response ID
objectstringObject type (e.g., "chat.completion")
creatednumberUnix timestamp of creation
choicesChoice[]Array of completion choices
usageUsageToken usage statistics

Choice

PropertyTypeDescription
indexnumber?Choice index
messageMessage?The completion message (non-streaming)
finish_reasonstring?"stop", "length", "content_filter", etc.

Usage

PropertyTypeDescription
prompt_tokensnumberTokens in the prompt
completion_tokensnumberTokens in the completion
total_tokensnumberTotal tokens consumed

Advanced: Custom Interceptors

Since the OpenAI class exposes its Fetcher instance, you can add interceptors for logging, retrying, or authentication refresh:

typescript
import { OpenAI } from '@ahoo-wang/fetcher-openai';

const openai = new OpenAI({
  baseURL: 'https://api.openai.com/v1',
  apiKey: process.env.OPENAI_API_KEY!,
});

// Add a logging interceptor
openai.fetcher.interceptors.request.use({
  name: 'RequestLogger',
  order: 100,
  intercept(exchange) {
    console.log(`[OpenAI] ${exchange.request.method} ${exchange.request.url}`);
  },
});

// Add a rate-limit retry interceptor
openai.fetcher.interceptors.error.use({
  name: 'RateLimitRetry',
  order: 100,
  async intercept(exchange) {
    if (exchange.error?.response?.status === 429) {
      const retryAfter = parseInt(
        exchange.error.response.headers.get('Retry-After') || '1',
      );
      await new Promise(r => setTimeout(r, retryAfter * 1000));
      // Retry the request
      const response = await fetch(exchange.request);
      exchange.response = response;
      exchange.error = undefined;
    }
  },
});

Exported API Summary

ExportTypeSource
OpenAIClassopenai.ts
OpenAIOptionsInterfaceopenai.ts
ChatClientClasschat/chatClient.ts
ChatRequestInterfacechat/types.ts
ChatResponseInterfacechat/types.ts
MessageInterfacechat/types.ts
ChoiceInterfacechat/types.ts
UsageInterfacechat/types.ts
CompletionStreamResultExtractorFunctionchat/completionStreamResultExtractor.ts
DoneDetectorFunctionchat/completionStreamResultExtractor.ts

Released under the Apache License 2.0.