insist-youself opened a new pull request, #11987:
URL: https://github.com/apache/inlong/pull/11987
<!-- Prepare a Pull Request
Change the title of pull request refer to the following example:
[INLONG-XYZ][Component] Title of the pull request
-->
<!-- Specify the issue this pull request going to fix.
The following *XYZ* should be replaced by the actual [GitHub
Issue](https://github.com/apache/inlong/issues) number)-->
Fixes #11924
### Motivation
We introduce a new Sort connector for Weaviate to extend Apache InLong’s
connector ecosystem to vector databases on Flink 1.15. This enables:
- Unified and configurable read/write between heterogeneous sources and
Weaviate using standard Flink SQL and InLong node abstractions.
- Support for AI/RAG scenarios by moving structured/semi-structured data
along with vectors into Weaviate and syncing out to downstream systems.
- Better observability and reliability through InLong Audit metrics,
configurable retry/backoff, batch ingestion, and memory/backpressure handling.
- A consistent developer and operator experience aligned with the existing
InLong Sort framework.
### Modifications
The following changes are included:
Core module and configuration
- Added a new weaviate module under sort-flink v1.15 sort-connectors with
standard package layout (table, source, sink, utils).
- Created child/parent Maven pom entries and dependency wiring (Weaviate
Java Client, InLong Sort base modules).
Protocol nodes and registration
- Implemented WeaviateExtractNode and WeaviateLoadNode with validation,
InLongMetric/Metadata interfaces, genTableName/tableOptions, and constructor
via JsonCreator.
- Registered new node types into JsonSubTypes in ExtractNode, LoadNode, and
Node.
Options and factory
- Implemented WeaviateOptions with required options: URL, CLASS_NAME;
optional options: API_KEY, BATCH_SIZE, FLUSH_INTERVAL, VECTOR_FIELD, etc., with
defaults and descriptions.
- Implemented WeaviateDynamicTableFactory (factoryIdentifier =
weaviate-inlong) with requiredOptions/optionalOptions, validation, MetricOption
construction, and creation of runtime sources/sinks.
Runtime and utilities
- WeaviateClientUtils: client creation, health-checks, retry helpers
(extensible for pooling).
- WeaviateTypeConverter: type mapping from Flink data types to Weaviate,
vector conversions (String/List to float[]), RowData <-> Weaviate object
conversion, guardrails/validation.
- WeaviateSourceFunction: initializes client and SourceExactlyMetric,
performs GraphQL queries, converts data, handles retry/cancel, reports metrics.
- WeaviateDynamicTableSource: returns SourceFunction runtime provider,
copy/asSummaryString, integrates Audit.
- WeaviateSinkFunction: initializes client and SinkExactlyMetric, batching
buffer and flush routines, memory/backpressure controls, retries and failure
handling, metrics reporting.
- WeaviateDynamicTableSink: returns SinkFunction runtime provider,
copy/asSummaryString, supports ChangelogMode (INSERT/UPDATE/DELETE), integrates
Audit.
SPI registration
- Added META-INF/services entry for org.apache.flink.table.factories.Factory
to register WeaviateDynamicTableFactory.
Integration tests
- Added WeaviateExtractToMySqlLoadTest and MySqlExtractToWeaviateLoadTest to
verify end-to-end ETL flows: build ExtractNode/LoadNode/StreamInfo/GroupInfo;
run via FlinkSqlParser; verify data correctness.
Performance optimizations
- Tuned batching in WeaviateSinkFunction, added memory usage monitoring with
auto-flush safeguards.
- Prepared connection reuse strategies and backpressure handling interfaces.
- Optimized common GraphQL query patterns as appropriate.
Error handling and monitoring
- Enhanced structured logging and error messages; introduced retry with
backoff for connection and write failures.
- Strengthened failure handling for write errors (including partial
failures).
- Integrated InLong Audit metrics across components (source/sink paths,
TableFactory).
Backward compatibility and scope
- Additive change; existing connectors and pipelines are unaffected.
- New functionality is gated by table factory identifier weaviate-inlong and
new node types.
### Verifying this change
- [ ] This change is a trivial rework/code cleanup without any test coverage.
- [ ] This change is already covered by existing tests, such as:
- [x] This change added tests and can be verified as follows:
- Added integration tests: WeaviateExtractToMySqlLoadTest validating
Weaviate -> MySql flow using FlinkSqlParser and node graph construction.
- Added integration tests: MySqlExtractToWeaviateLoadTest validating MySQL
-> Weaviate ingestion with vector conversion and batch write behavior.
- Manual checks: Verified factory SPI discovery with the weaviate-inlong
identifier, audited metrics emission on source/sink paths, basic retry behavior
on transient failures, and configuration validation for
URL/CLASS_NAME/API_KEY/BATCH_SIZE/FLUSH_INTERVAL/VECTOR_FIELD.
### Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? JavaDocs for public classes、option
descriptions and user-facing docs are included;
- user-facing docs:
[Weaviate Sort
Connector.md](https://github.com/user-attachments/files/22228157/Weaviate.Sort.Connector.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]