insist-youself opened a new pull request, #11987:
URL: https://github.com/apache/inlong/pull/11987

   <!-- Prepare a Pull Request
   Change the title of pull request refer to the following example:
     [INLONG-XYZ][Component] Title of the pull request 
   -->
   
   <!-- Specify the issue this pull request going to fix.
   The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)-->
   
   Fixes #11924
   
   ### Motivation
   
   We introduce a new Sort connector for Weaviate to extend Apache InLong’s 
connector ecosystem to vector databases on Flink 1.15. This enables:
   
   - Unified and configurable read/write between heterogeneous sources and 
Weaviate using standard Flink SQL and InLong node abstractions.
   - Support for AI/RAG scenarios by moving structured/semi-structured data 
along with vectors into Weaviate and syncing out to downstream systems.
   - Better observability and reliability through InLong Audit metrics, 
configurable retry/backoff, batch ingestion, and memory/backpressure handling.
   - A consistent developer and operator experience aligned with the existing 
InLong Sort framework.
   
   ### Modifications
   
   The following changes are included:
   
   Core module and configuration
   
   - Added a new weaviate module under sort-flink v1.15 sort-connectors with 
standard package layout (table, source, sink, utils).
   - Created child/parent Maven pom entries and dependency wiring (Weaviate 
Java Client, InLong Sort base modules).
   
   Protocol nodes and registration 
   
   - Implemented WeaviateExtractNode and WeaviateLoadNode with validation, 
InLongMetric/Metadata interfaces, genTableName/tableOptions, and constructor 
via JsonCreator.
   - Registered new node types into JsonSubTypes in ExtractNode, LoadNode, and 
Node.
   
   Options and factory 
   
   - Implemented WeaviateOptions with required options: URL, CLASS_NAME; 
optional options: API_KEY, BATCH_SIZE, FLUSH_INTERVAL, VECTOR_FIELD, etc., with 
defaults and descriptions.
   - Implemented WeaviateDynamicTableFactory (factoryIdentifier = 
weaviate-inlong) with requiredOptions/optionalOptions, validation, MetricOption 
construction, and creation of runtime sources/sinks.
   
   Runtime and utilities
   
   - WeaviateClientUtils: client creation, health-checks, retry helpers 
(extensible for pooling).
   - WeaviateTypeConverter: type mapping from Flink data types to Weaviate, 
vector conversions (String/List to float[]), RowData <-> Weaviate object 
conversion, guardrails/validation.
   - WeaviateSourceFunction: initializes client and SourceExactlyMetric, 
performs GraphQL queries, converts data, handles retry/cancel, reports metrics.
   - WeaviateDynamicTableSource: returns SourceFunction runtime provider, 
copy/asSummaryString, integrates Audit.
   - WeaviateSinkFunction: initializes client and SinkExactlyMetric, batching 
buffer and flush routines, memory/backpressure controls, retries and failure 
handling, metrics reporting.
   - WeaviateDynamicTableSink: returns SinkFunction runtime provider, 
copy/asSummaryString, supports ChangelogMode (INSERT/UPDATE/DELETE), integrates 
Audit.
   
   SPI registration
   
   - Added META-INF/services entry for org.apache.flink.table.factories.Factory 
to register WeaviateDynamicTableFactory.
   
   Integration tests
   
   - Added WeaviateExtractToMySqlLoadTest and MySqlExtractToWeaviateLoadTest to 
verify end-to-end ETL flows: build ExtractNode/LoadNode/StreamInfo/GroupInfo; 
run via FlinkSqlParser; verify data correctness.
   
   Performance optimizations
   
   - Tuned batching in WeaviateSinkFunction, added memory usage monitoring with 
auto-flush safeguards.
   - Prepared connection reuse strategies and backpressure handling interfaces.
   - Optimized common GraphQL query patterns as appropriate.
   
   Error handling and monitoring 
   
   - Enhanced structured logging and error messages; introduced retry with 
backoff for connection and write failures.
   - Strengthened failure handling for write errors (including partial 
failures).
   - Integrated InLong Audit metrics across components (source/sink paths, 
TableFactory).
   
   Backward compatibility and scope
   
   - Additive change; existing connectors and pipelines are unaffected.
   - New functionality is gated by table factory identifier weaviate-inlong and 
new node types.
   
   ### Verifying this change
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
   
   - [x] This change added tests and can be verified as follows:
     - Added integration tests: WeaviateExtractToMySqlLoadTest validating 
Weaviate -> MySql flow using FlinkSqlParser and node graph construction.
     - Added integration tests: MySqlExtractToWeaviateLoadTest validating MySQL 
-> Weaviate ingestion with vector conversion and batch write behavior.
     - Manual checks: Verified factory SPI discovery with the weaviate-inlong 
identifier, audited metrics emission on source/sink paths, basic retry behavior 
on transient failures, and configuration validation for 
URL/CLASS_NAME/API_KEY/BATCH_SIZE/FLUSH_INTERVAL/VECTOR_FIELD.
   
   ### Documentation
   
   - Does this pull request introduce a new feature? yes
   - If yes, how is the feature documented? JavaDocs for public classes、option 
descriptions and user-facing docs are included; 
   - user-facing docs:
   [Weaviate Sort 
Connector.md](https://github.com/user-attachments/files/22228157/Weaviate.Sort.Connector.md)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to