This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e65e583 [Fix] Fix some bugs in commit offset to kafka, intercepting
non-schema changes, and intercepting truncate table (#16)
e65e583 is described below
commit e65e583e6329e6be6ffe75c35e06ec22f7cc2c6e
Author: wudongliang <[email protected]>
AuthorDate: Sat May 11 11:12:35 2024 +0800
[Fix] Fix some bugs in commit offset to kafka, intercepting non-schema
changes, and intercepting truncate table (#16)
---
.../writer/schema/DebeziumSchemaChange.java | 83 ++++++++++++++++------
1 file changed, 60 insertions(+), 23 deletions(-)
diff --git
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
index fc0823b..1eeab0e 100644
---
a/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
+++
b/src/main/java/org/apache/doris/kafka/connector/writer/schema/DebeziumSchemaChange.java
@@ -21,6 +21,7 @@ package org.apache.doris.kafka.connector.writer.schema;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.data.Envelope;
+import io.debezium.util.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -45,6 +46,9 @@ import org.slf4j.LoggerFactory;
public class DebeziumSchemaChange extends DorisWriter {
private static final Logger LOG =
LoggerFactory.getLogger(DebeziumSchemaChange.class);
+ public static final String SCHEMA_CHANGE_VALUE = "SchemaChangeValue";
+ public static final String TABLE_CHANGES = "tableChanges";
+ public static final String TABLE_CHANGES_TYPE = "type";
private final Map<String, String> topic2TableMap;
private SchemaChangeManager schemaChangeManager;
private DorisSystemService dorisSystemService;
@@ -80,36 +84,67 @@ public class DebeziumSchemaChange extends DorisWriter {
@Override
public void insert(SinkRecord record) {
+ if (!validate(record)) {
+ processedOffset.set(record.kafkaOffset());
+ return;
+ }
schemaChange(record);
}
- @Override
- public void commit(int partition) {
- // do nothing
- }
+ private boolean validate(final SinkRecord record) {
+ if (!isSchemaChange(record)) {
+ LOG.warn(
+ "Current topic={}, the message does not contain schema
change change information, please check schema.topic",
+ dorisOptions.getSchemaTopic());
+ throw new SchemaChangeException(
+ "The message does not contain schema change change
information, please check schema.topic");
+ }
- private void schemaChange(final SinkRecord record) {
- String tableName = resolveTableName(record);
+ tableName = resolveTableName(record);
if (tableName == null) {
LOG.warn(
"Ignored to write record from topic '{}' partition '{}'
offset '{}'. No resolvable table name",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset());
- processedOffset.set(record.kafkaOffset());
- return;
+ return false;
}
+
+ if (!sinkTableSet.contains(tableName)) {
+ LOG.warn(
+ "The "
+ + tableName
+ + " is not defined and requires synchronized data.
If you need to synchronize the table data, please configure it in
'doris.topic2table.map'");
+ return false;
+ }
+
Struct recordStruct = (Struct) (record.value());
- List<Object> tableChanges = recordStruct.getArray("tableChanges");
+ if (isTruncate(recordStruct)) {
+ LOG.warn("Truncate {} table is not supported", tableName);
+ return false;
+ }
+
+ List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
Struct tableChange = (Struct) tableChanges.get(0);
- if ("DROP".equalsIgnoreCase(tableChange.getString("type"))
- || "CREATE".equalsIgnoreCase(tableChange.getString("type"))) {
+ if ("DROP".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))
+ ||
"CREATE".equalsIgnoreCase(tableChange.getString(TABLE_CHANGES_TYPE))) {
LOG.warn(
"CREATE and DROP {} tables are currently not supported.
Please create or drop them manually.",
tableName);
- processedOffset.set(record.kafkaOffset());
- return;
+ return false;
}
+ return true;
+ }
+
+ @Override
+ public void commit(int partition) {
+ // do nothing
+ }
+
+ private void schemaChange(final SinkRecord record) {
+ Struct recordStruct = (Struct) (record.value());
+ List<Object> tableChanges = recordStruct.getArray(TABLE_CHANGES);
+ Struct tableChange = (Struct) tableChanges.get(0);
RecordDescriptor recordDescriptor =
RecordDescriptor.builder()
.withSinkRecord(record)
@@ -118,6 +153,17 @@ public class DebeziumSchemaChange extends DorisWriter {
tableChange(tableName, recordDescriptor);
}
+ private boolean isTruncate(final Struct record) {
+ // Generally the truncate corresponding tableChanges is empty
+ return record.getArray(TABLE_CHANGES).isEmpty();
+ }
+
+ private static boolean isSchemaChange(SinkRecord record) {
+ return record.valueSchema() != null
+ && !Strings.isNullOrEmpty(record.valueSchema().name())
+ && record.valueSchema().name().contains(SCHEMA_CHANGE_VALUE);
+ }
+
private String resolveTableName(SinkRecord record) {
if (isTombstone(record)) {
LOG.warn(
@@ -170,15 +216,6 @@ public class DebeziumSchemaChange extends DorisWriter {
}
private void tableChange(String tableName, RecordDescriptor
recordDescriptor) {
- if (!sinkTableSet.contains(tableName)) {
- processedOffset.set(recordDescriptor.getOffset());
- LOG.warn(
- "The "
- + tableName
- + " is not defined and requires synchronized data.
If you need to synchronize the table data, please configure it in
'doris.topic2table.map'");
- return;
- }
-
if (!hasTable(tableName)) {
// TODO Table does not exist, automatically created it.
LOG.error("{} Table does not exist, please create manually.",
tableName);
@@ -223,7 +260,7 @@ public class DebeziumSchemaChange extends DorisWriter {
public long getOffset() {
committedOffset.set(processedOffset.get());
- return committedOffset.get();
+ return committedOffset.get() + 1;
}
private boolean isTombstone(SinkRecord record) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]