lihaosky opened a new pull request, #28223: URL: https://github.com/apache/flink/pull/28223
## What is the purpose of the change Adds a synchronous OpenAI predict provider backed by `com.openai.client.OpenAIClient`, alongside the existing async one. The factory now returns a `Provider` implementing both `AsyncPredictRuntimeProvider` and `PredictRuntimeProvider`, so the planner picks the runtime mode via [`MLPredictRuntimeConfigOptions.ASYNC`](https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MLPredictRuntimeConfigOptions.java) (e.g. `MAP['async', 'false']` in `ML_PREDICT`). Closes [FLINK-38705](https://issues.apache.org/jira/browse/FLINK-38705). Sub-task of FLIP-525 (FLINK-37777). ## Brief change log - `OpenAISyncEmbeddingModelFunction` / `OpenAISyncChatModelFunction` (new) — `PredictFunction`s that drive the sync `OpenAIClient`. - `AbstractOpenAISyncModelFunction` (new) — mirrors `AbstractOpenAIAsyncModelFunction` (sync client lifecycle, `predict()`, error routing). - Per-task helpers extracted so async and sync share the SDK request/response code: - `OpenAIModelCommons` — shared config / output schema / error handling. - `OpenAIEmbeddingTask` — embedding-specific config, request build, response convert, `ENDPOINT_SUFFIX`. - `OpenAIChatTask` — chat-specific config, request build, response convert, `ENDPOINT_SUFFIX`, and the `ChatModelResponseFormat` enum that previously lived on the chat function. - `ErrorHandlingStrategy` / `RetryFallbackStrategy` / `ErrorMessageMetadata` extracted to top-level files. - `OpenAIUtils` gains `createSyncClient` / `releaseSyncClient` with the same refcounted-cache pattern as the async client (separate cache). - `OpenAIModelProviderFactory.Provider` implements both runtime-provider interfaces; instantiates both an async and a sync function for the resolved endpoint. - Renames for naming consistency with the new sync hierarchy: - `AbstractOpenAIModelFunction` → `AbstractOpenAIAsyncModelFunction` - `OpenAIEmbeddingModelFunction` → `OpenAIAsyncEmbeddingModelFunction` - `OpenAIChatModelFunction` → `OpenAIAsyncChatModelFunction` ## Verifying this change This change is covered by a new unit test, `OpenAISyncModelFunctionTest`, which exercises both sync embedding and sync chat end-to-end via `MAP['async', 'false']` on `ML_PREDICT`, against a `MockWebServer`. All pre-existing tests in `flink-model-openai` continue to pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no — the openai model classes are internal) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes — sync OpenAI predict provider) - If yes, how is the feature documented? (JavaDocs on the new classes; no user-facing docs change since the openai provider is already documented and the sync/async selection is via the existing `async` runtime config) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
