This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0c3044e3f6 [Hotfix][CDC] Fix split schema change stream (#7003)
0c3044e3f6 is described below
commit 0c3044e3f6d5d1a52df1896e069676b509419e4b
Author: hailin0 <[email protected]>
AuthorDate: Tue Jun 18 14:29:33 2024 +0800
[Hotfix][CDC] Fix split schema change stream (#7003)
---
.../external/IncrementalSourceStreamFetcher.java | 157 ++++++++++++---------
.../IncrementalSourceStreamFetcherTest.java | 23 ++-
2 files changed, 115 insertions(+), 65 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 4cad739ac6..338cb657b3 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -171,69 +171,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
* checkpoint-after] [a, b, c, d, e]
*/
Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent>
batchEvents) {
- List<SourceRecords> sourceRecordsSet = new ArrayList<>();
-
- List<SourceRecord> sourceRecordList = new ArrayList<>();
- SourceRecord previousRecord = null;
- for (int i = 0; i < batchEvents.size(); i++) {
- DataChangeEvent event = batchEvents.get(i);
- SourceRecord currentRecord = event.getRecord();
- if (!shouldEmit(currentRecord)) {
- continue;
- }
-
- if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
- if (!schemaChangeResolver.support(currentRecord)) {
- continue;
- }
-
- if (previousRecord == null) {
- // add schema-change-before to first
- sourceRecordList.add(
-
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
- sourceRecordsSet.add(new SourceRecords(sourceRecordList));
- sourceRecordList = new ArrayList<>();
- sourceRecordList.add(currentRecord);
- } else if
(SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
- sourceRecordList.add(currentRecord);
- } else {
- sourceRecordList.add(
-
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
- sourceRecordsSet.add(new SourceRecords(sourceRecordList));
- sourceRecordList = new ArrayList<>();
- sourceRecordList.add(currentRecord);
- }
- } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
- || SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
- if (previousRecord == null
- || SourceRecordUtils.isDataChangeRecord(previousRecord)
- ||
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
- sourceRecordList.add(currentRecord);
- } else {
- sourceRecordList.add(
-
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
- sourceRecordsSet.add(new SourceRecords(sourceRecordList));
- sourceRecordList = new ArrayList<>();
- sourceRecordList.add(currentRecord);
- }
- }
- previousRecord = currentRecord;
- if (i == batchEvents.size() - 1) {
- if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
- sourceRecordList.add(
-
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
- }
- sourceRecordsSet.add(new SourceRecords(sourceRecordList));
- }
- }
-
- if (sourceRecordsSet.size() > 1) {
- log.debug(
- "Split events stream into {} batches and mark schema
checkpoint before/after",
- sourceRecordsSet.size());
- }
-
- return sourceRecordsSet.iterator();
+ return new SchemaChangeStreamSplitter().split(batchEvents);
}
private void checkReadException() {
@@ -349,4 +287,97 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
this.pureBinlogPhaseTables.clear();
}
+
+ class SchemaChangeStreamSplitter {
+ private List<SourceRecords> blockSet;
+ private List<SourceRecord> currentBlock;
+ private SourceRecord previousRecord;
+
+ public SchemaChangeStreamSplitter() {
+ blockSet = new ArrayList<>();
+ currentBlock = new ArrayList<>();
+ previousRecord = null;
+ }
+
+ public Iterator<SourceRecords> split(List<DataChangeEvent>
batchEvents) {
+ for (int i = 0; i < batchEvents.size(); i++) {
+ DataChangeEvent event = batchEvents.get(i);
+ SourceRecord currentRecord = event.getRecord();
+ if (!shouldEmit(currentRecord)) {
+ continue;
+ }
+
+ if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
+ if (!schemaChangeResolver.support(currentRecord)) {
+ continue;
+ }
+
+ if (previousRecord == null) {
+ // add schema-change-before to first
+ currentBlock.add(
+
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
+ flipBlock();
+
+ currentBlock.add(currentRecord);
+ } else if
(SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
+ currentBlock.add(currentRecord);
+ } else {
+ currentBlock.add(
+
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
+ flipBlock();
+
+ currentBlock.add(currentRecord);
+ }
+ } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
+ || SourceRecordUtils.isHeartbeatRecord(currentRecord))
{
+ if (previousRecord == null
+ ||
SourceRecordUtils.isDataChangeRecord(previousRecord)
+ ||
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
+ currentBlock.add(currentRecord);
+ } else {
+ endBlock(previousRecord);
+ flipBlock();
+
+ currentBlock.add(currentRecord);
+ }
+ }
+
+ previousRecord = currentRecord;
+ if (i == batchEvents.size() - 1) {
+ endBlock(currentRecord);
+ flipBlock();
+ }
+ }
+
+ endLastBlock(previousRecord);
+
+ if (blockSet.size() > 1) {
+ log.debug(
+ "Split events stream into {} batches and mark schema
change checkpoint",
+ blockSet.size());
+ }
+
+ return blockSet.iterator();
+ }
+
+ void flipBlock() {
+ if (!currentBlock.isEmpty()) {
+ blockSet.add(new SourceRecords(currentBlock));
+ currentBlock = new ArrayList<>();
+ }
+ }
+
+ void endBlock(SourceRecord lastRecord) {
+ if (!currentBlock.isEmpty()) {
+ if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) {
+
currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord));
+ }
+ }
+ }
+
+ void endLastBlock(SourceRecord lastRecord) {
+ endBlock(lastRecord);
+ flipBlock();
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
index a17f7f8673..ee8d4d7e5d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
@@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest {
.with(Heartbeat.HEARTBEAT_INTERVAL, 1)
.with(TRANSACTION_TOPIC, "test")
.build();
+ private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN";
@Test
public void testSplitSchemaChangeStream() throws Exception {
@@ -107,6 +109,7 @@ public class IncrementalSourceStreamFetcherTest {
inputEvents.add(new DataChangeEvent(createDataEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
outputEvents.forEachRemaining(records::add);
@@ -134,6 +137,7 @@ public class IncrementalSourceStreamFetcherTest {
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
inputEvents.add(new DataChangeEvent(createDataEvent()));
inputEvents.add(new DataChangeEvent(createDataEvent()));
+ inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
outputEvents.forEachRemaining(records::add);
@@ -323,13 +327,21 @@ public class IncrementalSourceStreamFetcherTest {
}
static SourceRecord createSchemaChangeEvent() {
+ return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC");
+ }
+
+ static SourceRecord createSchemaChangeUnknownEvent() {
+ return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY);
+ }
+
+ static SourceRecord createSchemaChangeEvent(String topic) {
Schema keySchema =
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
SourceRecord record =
new SourceRecord(
Collections.emptyMap(),
Collections.emptyMap(),
- null,
+ topic,
keySchema,
null,
null,
@@ -377,7 +389,14 @@ public class IncrementalSourceStreamFetcherTest {
static IncrementalSourceStreamFetcher createFetcher() {
SchemaChangeResolver schemaChangeResolver =
mock(SchemaChangeResolver.class);
- when(schemaChangeResolver.support(any())).thenReturn(true);
+ when(schemaChangeResolver.support(any()))
+ .thenAnswer(
+ (Answer<Boolean>)
+ invocationOnMock -> {
+ SourceRecord record =
invocationOnMock.getArgument(0);
+ return record.topic() == null
+ ||
!record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY);
+ });
IncrementalSourceStreamFetcher fetcher =
new IncrementalSourceStreamFetcher(null, 0,
schemaChangeResolver);
IncrementalSourceStreamFetcher spy = spy(fetcher);