This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 58a5a2d134 [Feature][Connector-V2][Milvus] Sink writer flush by
interval (#9961)
58a5a2d134 is described below
commit 58a5a2d134d26f448ea53f59da89f8b0116c78b2
Author: loupipalien <[email protected]>
AuthorDate: Sat Nov 8 21:34:47 2025 +0800
[Feature][Connector-V2][Milvus] Sink writer flush by interval (#9961)
---
docs/en/connector-v2/sink/Milvus.md | 28 ++---
docs/zh/connector-v2/sink/Milvus.md | 45 +++++---
.../seatunnel/milvus/config/MilvusSinkOptions.java | 1 +
.../seatunnel/milvus/sink/MilvusSinkWriter.java | 19 ++--
.../e2e/connector/v2/milvus/MilvusIT.java | 114 +++++++++++++++++++++
.../test/resources/streaming-fake-to-milvus.conf | 67 ++++++++++++
6 files changed, 241 insertions(+), 33 deletions(-)
diff --git a/docs/en/connector-v2/sink/Milvus.md
b/docs/en/connector-v2/sink/Milvus.md
index 0f1232ed0c..59f8920950 100644
--- a/docs/en/connector-v2/sink/Milvus.md
+++ b/docs/en/connector-v2/sink/Milvus.md
@@ -39,20 +39,20 @@ This Milvus sink connector write data to Milvus or Zilliz
Cloud, it has the foll
## Sink Options
-| Name | Type | Required | Default |
Description
|
-|----------------------|---------|----------|------------------------------|------------------------------------------------------------------------------------|
-| url | String | Yes | - |
The URL to connect to Milvus or Zilliz Cloud.
|
-| token | String | Yes | - |
User:password
|
-| database | String | No | - |
Write data to which database, default is source database.
|
-| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST |
Auto create table when table not exist.
|
-| enable_auto_id | boolean | No | false |
Primary key column enable autoId.
|
-| enable_upsert | boolean | No | false |
Upsert data not insert.
|
-| enable_dynamic_field | boolean | No | true |
Enable create table with dynamic field.
|
-| batch_size | int | No | 1000 |
Write batch size.
|
-| partition_key | String | No | |
Milvus partition key field
|
-| create_index | boolean | No | false |
Automatically create vector indexes for collection to improve query
performance. |
-| load_collection | boolean | No | false |
Load collection into Milvus memory for immediate query availability.
|
-| collection_description | Map<String, String> | No | {} |
Collection descriptions map where key is collection name and value is
description. |
+| Name | Type | Required | Default
| Description
|
+|------------------------|---------------------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | -
| The URL to connect to Milvus or Zilliz Cloud.
|
+| token | String | Yes | -
| User:password
|
+| database | String | No | -
| Write data to which database, default is source database.
|
+| schema_save_mode | enum | No |
CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist.
|
+| enable_auto_id | boolean | No | false
| Primary key column enable autoId.
|
+| enable_upsert | boolean | No | false
| Upsert data not insert.
|
+| enable_dynamic_field | boolean | No | true
| Enable create table with dynamic field.
|
+| batch_size | int | No | 1000
| Write batch size. When the number of buffered records reaches
`batch_size` or the time reaches `checkpoint.interval`, it will trigger a write
flush |
+| partition_key | String | No |
| Milvus partition key field
|
+| create_index | boolean | No | false
| Automatically create vector indexes for collection to improve query
performance.
|
+| load_collection | boolean | No | false
| Load collection into Milvus memory for immediate query
availability.
|
+| collection_description | Map<String, String> | No | {}
| Collection descriptions map where key is collection name and value
is description.
|
## Task Example
diff --git a/docs/zh/connector-v2/sink/Milvus.md
b/docs/zh/connector-v2/sink/Milvus.md
index 37c1d467ee..b992c907fd 100644
--- a/docs/zh/connector-v2/sink/Milvus.md
+++ b/docs/zh/connector-v2/sink/Milvus.md
@@ -19,7 +19,7 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
##数据类型映射
-| Milvus数据类型 | SeaTunnel 数据类型 |
+| Milvus数据类型 | SeaTunnel 数据类型 |
|---------------------|---------------------|
| INT8 | TINYINT |
| INT16 | SMALLINT |
@@ -39,20 +39,24 @@ Milvus sink连接器将数据写入Milvus或Zilliz Cloud,它具有以下功能
## Sink 选项
-| 名字 | 类型 | 是否必传 | 默认值 | 描述
|
-|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
-| url | String | 是 | - |
连接到Milvus或Zilliz Cloud的URL。 |
-| token | String | 是 | - |
用户:密码 |
-| database | String | 否 | - |
将数据写入哪个数据库,默认为源数据库。 |
-| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
当表不存在时自动创建表。 |
-| enable_auto_id | boolean | 否 | false |
主键列启用autoId。 |
-| enable_upsert | boolean | 否 | false |
是否启用upsert。 |
-| enable_dynamic_field | boolean | 否 | true |
是否启用带动态字段的创建表。 |
-| batch_size | int | 否 | 1000 |
写入批大小。 |
-| partition_key | String | 否 | |
Milvus分区键字段 |
+| 名字 | 类型 | 是否必传 | 默认值
| 描述 |
+|------------------------|---------------------|------|------------------------------|---------------------------------------------------------------------|
+| url | String | 是 | -
| 连接到Milvus或Zilliz Cloud的URL。 |
+| token | String | 是 | -
| 用户:密码 |
+| database | String | 否 | -
| 将数据写入哪个数据库,默认为源数据库。 |
+| schema_save_mode | enum | 否 |
CREATE_SCHEMA_WHEN_NOT_EXIST | 当表不存在时自动创建表。
|
+| enable_auto_id | boolean | 否 | false
| 主键列启用autoId。 |
+| enable_upsert | boolean | 否 | false
| 是否启用upsert。 |
+| enable_dynamic_field | boolean | 否 | true
| 是否启用带动态字段的创建表。 |
+| batch_size | int | 否 | 1000
| 写入批大小。当缓冲记录数达到 `batch_size` 或时间达到 `checkpoint.interval` 时,将触发一次写入刷新 |
+| partition_key | String | 否 |
| Milvus分区键字段 |
+| create_index | boolean | No | false
| 自动为集合创建向量索引以提高查询性能 |
+| load_collection | boolean | No | false
| 将集合加载到 Milvus 内存中以便立即进行查询 |
+| collection_description | Map<String, String> | No | {}
| 集合描述映射,其中键是集合名称,值是描述 |
## 任务示例
+### 基础配置
```bash
sink {
Milvus {
@@ -63,6 +67,23 @@ sink {
}
```
+### 带 Index 和 Loading 的高级配置
+```bash
+sink {
+ Milvus {
+ url = "http://127.0.0.1:19530"
+ token = "username:password"
+ batch_size = 1000
+ create_index = true
+ load_collection = true
+ collection_description = {
+ "user_vectors" = "User embedding vectors for recommendation"
+ "product_vectors" = "Product feature vectors for search"
+ }
+ }
+}
+```
+
## 变更日志
<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
index 1c7d00f3a9..4113be01b8 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/config/MilvusSinkOptions.java
@@ -82,6 +82,7 @@ public class MilvusSinkOptions extends MilvusBaseOptions {
.intType()
.defaultValue(1000)
.withDescription("writer batch size");
+
public static final Option<Integer> RATE_LIMIT =
Options.key("rate_limit")
.intType()
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
index 98b2b46c3b..55402896ac 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSinkWriter.java
@@ -61,13 +61,7 @@ public class MilvusSinkWriter
public void write(SeaTunnelRow element) {
batchWriter.addToBatch(element);
if (batchWriter.needFlush()) {
- try {
- // Flush the batch writer
- batchWriter.flush();
- } catch (Exception e) {
- log.error("flush Milvus sink writer failed", e);
- throw new
MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
- }
+ flush();
}
}
@@ -81,6 +75,7 @@ public class MilvusSinkWriter
*/
@Override
public Optional<MilvusCommitInfo> prepareCommit() throws IOException {
+ flush();
return Optional.empty();
}
@@ -110,4 +105,14 @@ public class MilvusSinkWriter
throw new
MilvusConnectorException(MilvusConnectionErrorCode.CLOSE_CLIENT_ERROR, e);
}
}
+
+ private void flush() {
+ try {
+ // Flush the batch writer
+ batchWriter.flush();
+ } catch (Exception e) {
+ log.error("flush Milvus sink writer failed", e);
+ throw new
MilvusConnectorException(MilvusConnectionErrorCode.WRITE_DATA_FAIL, e);
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
index 3aed4f1455..1ad1fc9882 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/milvus/MilvusIT.java
@@ -58,6 +58,7 @@ import io.milvus.grpc.FieldSchema;
import io.milvus.grpc.IndexDescription;
import io.milvus.grpc.KeyValuePair;
import io.milvus.grpc.MutationResult;
+import io.milvus.grpc.QueryResults;
import io.milvus.param.ConnectParam;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
@@ -69,6 +70,7 @@ import io.milvus.param.collection.FieldType;
import io.milvus.param.collection.HasCollectionParam;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.dml.InsertParam;
+import io.milvus.param.dml.QueryParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.param.index.DescribeIndexParam;
import lombok.extern.slf4j.Slf4j;
@@ -82,6 +84,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -715,4 +719,114 @@ public class MilvusIT extends TestSuiteBase implements
TestResource {
log.info("Index verification passed for collection: {}.{}", database,
collection);
}
+
+ @TestTemplate
+ public void testStreamingFakeToMilvus(TestContainer container)
+ throws IOException, InterruptedException {
+ // flush by checkpoint interval
+ String jobId = "1";
+ String database = "streaming_test";
+ String collection = "streaming_simple_example";
+ String vectorField = "book_intro";
+ int checkpointInterval = 30000;
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/streaming-fake-to-milvus.conf",
+ jobId,
+ "database=" + database,
+ "collection=" + collection,
+ "batch_size=3");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // count write records
+ waitCollectionReady(database, collection, vectorField);
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .until(() -> countCollectionEntities(database, collection) >=
9);
+ Assertions.assertEquals(9, countCollectionEntities(database,
collection));
+ TimeUnit.MILLISECONDS.sleep(checkpointInterval);
+ Assertions.assertEquals(10, countCollectionEntities(database,
collection));
+
+ // cancel jobs
+ container.cancelJob(jobId);
+ }
+
+ private void waitCollectionReady(
+ String databaseName, String collectionName, String
vectorFieldName) {
+ // assert table exist
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .pollInterval(2, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ R<Boolean> hasCollectionResponse =
+ this.milvusClient.hasCollection(
+ HasCollectionParam.newBuilder()
+
.withDatabaseName(databaseName)
+
.withCollectionName(collectionName)
+ .build());
+ Assertions.assertEquals(
+ R.Status.Success.getCode(),
+ hasCollectionResponse.getStatus(),
+
Optional.ofNullable(hasCollectionResponse.getException())
+ .map(Exception::getMessage)
+ .orElse(""));
+ return hasCollectionResponse.getData();
+ });
+
+ // create index
+ R<RpcStatus> createIndexResponse =
+ milvusClient.createIndex(
+ CreateIndexParam.newBuilder()
+ .withDatabaseName(databaseName)
+ .withCollectionName(collectionName)
+ .withFieldName(vectorFieldName)
+ .withIndexType(IndexType.FLAT)
+ .withMetricType(MetricType.L2)
+ .build());
+ Assertions.assertEquals(
+ R.Status.Success.getCode(),
+ createIndexResponse.getStatus(),
+ Optional.ofNullable(createIndexResponse.getException())
+ .map(Exception::getMessage)
+ .orElse(""));
+
+ // load collection
+ R<RpcStatus> loadCollectionResponse =
+ milvusClient.loadCollection(
+ LoadCollectionParam.newBuilder()
+ .withDatabaseName(databaseName)
+ .withCollectionName(collectionName)
+ .build());
+ Assertions.assertEquals(
+ R.Status.Success.getCode(),
+ loadCollectionResponse.getStatus(),
+ Optional.ofNullable(loadCollectionResponse.getException())
+ .map(Exception::getMessage)
+ .orElse(""));
+ }
+
+ private long countCollectionEntities(String databaseName, String
collectionName) {
+ R<QueryResults> queryResults =
+ milvusClient.query(
+ QueryParam.newBuilder()
+ .withDatabaseName(databaseName)
+ .withCollectionName(collectionName)
+
.withOutFields(Collections.singletonList("count(*)"))
+ .build());
+ Assertions.assertEquals(R.Status.Success.getCode(),
queryResults.getStatus());
+ return queryResults
+ .getData()
+ .getFieldsData(0)
+ .getScalars()
+ .getLongData()
+ .getDataList()
+ .get(0);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
new file mode 100644
index 0000000000..4f7a8b9ab2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-milvus-e2e/src/test/resources/streaming-fake-to-milvus.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 30000
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ vector.dimension= 4
+ schema = {
+ table = ${collection}
+ columns = [
+ {
+ name = book_id
+ type = bigint
+ nullable = false
+ defaultValue = 0
+ comment = "primary key id"
+ },
+ {
+ name = book_intro
+ type = float_vector
+ columnScale =4
+ comment = "vector"
+ },
+ {
+ name = book_title
+ type = string
+ nullable = true
+ comment = "topic"
+ }
+ ]
+ primaryKey {
+ name = book_id
+ columnNames = [book_id]
+ }
+ }
+ }
+}
+
+sink {
+ Milvus {
+ url = "http://milvus-e2e:19530"
+ token = "root:Milvus"
+ database = ${database}
+ enable_upsert = false
+ batch_size = ${batch_size}
+ }
+}
\ No newline at end of file