This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1784c01a35 [Fix][Connector-V2] Fix kafka batch mode can not read all
message (#7135)
1784c01a35 is described below
commit 1784c01a35e6fa23cad9a9f514a6b840835ea51a
Author: Jia Fan <[email protected]>
AuthorDate: Tue Jul 9 21:50:39 2024 +0800
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135)
---
.github/workflows/backend.yml | 6 +-
docs/en/connector-v2/formats/avro.md | 2 +-
docs/zh/connector-v2/formats/avro.md | 2 +-
.../seatunnel/kafka/source/KafkaSourceReader.java | 31 ++++++----
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 70 +++++++++++++++-------
.../test/resources/avro/kafka_avro_to_assert.conf | 2 +-
.../jsonFormatIT/kafka_source_json_to_console.conf | 2 +-
..._source_to_assert_with_max_poll_records_1.conf} | 34 +++++------
...ce_format_error_handle_way_fail_to_console.conf | 1 -
...ce_format_error_handle_way_skip_to_console.conf | 1 -
.../textFormatIT/kafka_source_text_to_console.conf | 2 +-
...ource_text_to_console_assert_catalog_table.conf | 2 +-
12 files changed, 91 insertions(+), 64 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 9975d477da..b6094aff4d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1052,7 +1052,7 @@ jobs:
kafka-connector-it:
needs: [ changes, sanity-check ]
- if: needs.changes.outputs.api == 'true'
+ if: needs.changes.outputs.api == 'true' ||
contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
@@ -1068,7 +1068,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run kafka connector integration test
- if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-kafka-e2e -am -Pci
env:
@@ -1076,7 +1075,7 @@ jobs:
rocketmq-connector-it:
needs: [ changes, sanity-check ]
- if: needs.changes.outputs.api == 'true'
+ if: needs.changes.outputs.api == 'true' ||
contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
@@ -1092,7 +1091,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run rocket connector integration test
- if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true --no-snapshot-updates -pl
:connector-rocketmq-e2e -am -Pci
env:
diff --git a/docs/en/connector-v2/formats/avro.md
b/docs/en/connector-v2/formats/avro.md
index 638657b345..8fef411fb5 100644
--- a/docs/en/connector-v2/formats/avro.md
+++ b/docs/en/connector-v2/formats/avro.md
@@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
diff --git a/docs/zh/connector-v2/formats/avro.md
b/docs/zh/connector-v2/formats/avro.md
index 4e19ea4b98..7176f4e507 100644
--- a/docs/zh/connector-v2/formats/avro.md
+++ b/docs/zh/connector-v2/formats/avro.md
@@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index d136fabc40..02c2a9007e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -45,6 +45,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -103,7 +104,7 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
return;
}
- while (pendingPartitionsQueue.size() != 0) {
+ while (!pendingPartitionsQueue.isEmpty()) {
sourceSplits.add(pendingPartitionsQueue.poll());
}
sourceSplits.forEach(
@@ -120,9 +121,10 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
executorService.submit(thread);
return thread;
}));
+ List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
sourceSplits.forEach(
sourceSplit -> {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ CompletableFuture<Boolean> completableFuture = new
CompletableFuture<>();
TablePath tablePath = sourceSplit.getTablePath();
DeserializationSchema<SeaTunnelRow> deserializationSchema =
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
@@ -148,9 +150,14 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
for (TopicPartition partition
: partitions) {
List<ConsumerRecord<byte[], byte[]>>
recordList =
records.records(partition);
+ if
(Boundedness.BOUNDED.equals(
+
context.getBoundedness())
+ &&
recordList.isEmpty()) {
+
completableFuture.complete(true);
+ return;
+ }
for
(ConsumerRecord<byte[], byte[]> record :
recordList) {
-
try {
if
(deserializationSchema
instanceof
@@ -180,7 +187,8 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
&&
record.offset()
>=
sourceSplit
.getEndOffset()) {
- break;
+
completableFuture.complete(true);
+ return;
}
}
long lastOffset = -1;
@@ -199,18 +207,21 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
- completableFuture.complete(null);
+ completableFuture.complete(false);
});
- } catch (InterruptedException e) {
+ if (completableFuture.get()) {
+ finishedSplits.add(sourceSplit);
+ }
+ } catch (Exception e) {
throw new KafkaConnectorException(
KafkaConnectorErrorCode.CONSUME_DATA_FAILED,
e);
}
- completableFuture.join();
});
-
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the data.
- context.signalNoMoreElement();
+ finishedSplits.forEach(sourceSplits::remove);
+ if (sourceSplits.isEmpty()) {
+ context.signalNoMoreElement();
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 2f1c92048e..d4629851e7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -212,6 +212,27 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer
container)
+ throws IOException, InterruptedException {
+ TextSerializationSchema serializer =
+ TextSerializationSchema.builder()
+ .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+ .delimiter(",")
+ .build();
+ generateTestData(
+ row ->
+ new ProducerRecord<>(
+ "test_topic_text_max_poll_records_1",
+ null,
+ serializer.serialize(row)),
+ 0,
+ 100);
+ Container.ExecResult execResult =
+
container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer
container)
throws IOException, InterruptedException {
@@ -538,29 +559,34 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
private void generateTestData(ProducerRecordConverter converter, int
start, int end) {
- for (int i = start; i < end; i++) {
- SeaTunnelRow row =
- new SeaTunnelRow(
- new Object[] {
- Long.valueOf(i),
- Collections.singletonMap("key",
Short.parseShort("1")),
- new Byte[] {Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.of(2024, 1, 1),
- LocalDateTime.of(2024, 1, 1, 12, 59, 23)
- });
- ProducerRecord<byte[], byte[]> producerRecord =
converter.convert(row);
- producer.send(producerRecord);
+ try {
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ Long.valueOf(i),
+ Collections.singletonMap("key",
Short.parseShort("1")),
+ new Byte[] {Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.of(2024, 1, 1),
+ LocalDateTime.of(2024, 1, 1, 12, 59, 23)
+ });
+ ProducerRecord<byte[], byte[]> producerRecord =
converter.convert(row);
+ producer.send(producerRecord).get();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ producer.flush();
}
private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
index 31fe77a3e2..755a9a2b8d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf
@@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index f9a41e7987..3657390602 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_json"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format_error_handle_way = skip
schema = {
fields {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
similarity index 85%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
index d7f875272b..787858e229 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf
@@ -30,10 +30,13 @@ env {
source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
- topic = "test_topic_text"
+ topic = "test_topic_text_max_poll_records_1"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format_error_handle_way = fail
+ kafka.config = {
+ max.poll.records = 1
+ }
schema = {
columns = [
{
@@ -120,6 +123,9 @@ source {
}
sink {
+ console {
+ source_table_name = "kafka_table"
+ }
Assert {
source_table_name = "kafka_table"
rules =
@@ -143,24 +149,12 @@ sink {
]
}
]
- catalog_table_rule = {
- primary_key_rule = {
- primary_key_name = "primary key"
- primary_key_columns = ["id"]
- }
- constraint_key_rule = [
- {
- constraint_key_name = "unique_c_string"
- constraint_key_type = UNIQUE_KEY
- constraint_key_columns = [
- {
- constraint_key_column_name = "c_string"
- constraint_key_sort_type = ASC
- }
- ]
- }
- ]
- }
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
}
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index b6db50989a..d2a0f05354 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -37,7 +37,6 @@ source {
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = fail
- # kafka.auto.offset.reset = "earliest"
schema = {
fields {
id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 45b29d1915..88b6098b5e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -37,7 +37,6 @@ source {
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = skip
- # kafka.auto.offset.reset = "earliest"
schema = {
fields {
id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index 36f01c0337..3ce077bd58 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_text"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format_error_handle_way = fail
schema = {
fields {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
index d7f875272b..132829e324 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf
@@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_text"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ start_mode = "earliest"
format_error_handle_way = fail
schema = {
columns = [