This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e6fcdd72ed6 [Java][Debezium] Fix NPE in debeziumRecordInstant for
DELETE events (#37795)
e6fcdd72ed6 is described below
commit e6fcdd72ed621a01570775d4d5acd3b7845e2c60
Author: liferoad <[email protected]>
AuthorDate: Tue Mar 10 20:41:14 2026 -0400
[Java][Debezium] Fix NPE in debeziumRecordInstant for DELETE events (#37795)
* Fix #37738: handle Debezium DELETE records without valueSchema
* refactor: replace fully qualified class names with imports in
KafkaConnectSchemaTest.
---
.../apache/beam/io/debezium/KafkaConnectUtils.java | 27 +++++++++-----
.../beam/io/debezium/KafkaConnectSchemaTest.java | 41 ++++++++++++++++++++--
2 files changed, 58 insertions(+), 10 deletions(-)
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
index 7b97b11f0ec..12f43443782 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
@@ -80,15 +80,26 @@ public class KafkaConnectUtils {
}
public static Instant debeziumRecordInstant(SourceRecord record) {
- if
(!record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
- || record.valueSchema().field("ts_ms") == null) {
- throw new IllegalArgumentException(
- "Debezium record received is not of the right kind. "
- + String.format(
- "Should be STRUCT with ts_ms field. Instead it is: %s",
record.valueSchema()));
+ if (record.valueSchema() != null
+ &&
record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
+ && record.valueSchema().field("ts_ms") != null
+ && record.value() != null) {
+ Struct recordValue = (Struct) record.value();
+ return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
}
- Struct recordValue = (Struct) record.value();
- return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
+
+ if (record.sourceOffset() != null &&
record.sourceOffset().containsKey("ts_usec")) {
+ Object tsUsecValue = record.sourceOffset().get("ts_usec");
+ if (tsUsecValue instanceof Number) {
+ return Instant.ofEpochMilli(((Number) tsUsecValue).longValue() / 1000);
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "Debezium record received is not of the right kind. "
+ + String.format(
+ "Should be STRUCT with ts_ms field or sourceOffset with
ts_usec. Instead it is: %s, %s",
+ record.valueSchema(), record.sourceOffset()));
}
public static SourceRecordMapper<Row> beamRowFromSourceRecordFn(final Schema
recordSchema) {
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
index 95d9aeb9819..aed4e5caf16 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
@@ -20,8 +20,11 @@ package org.apache.beam.io.debezium;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
+import java.util.Collections;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
import org.hamcrest.Matchers;
+import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -59,11 +62,45 @@ public class KafkaConnectSchemaTest {
@Test
public void testTimestampRequired() {
- org.apache.kafka.connect.source.SourceRecord record =
SourceRecordJsonTest.buildSourceRecord();
+ SourceRecord record = SourceRecordJsonTest.buildSourceRecord();
IllegalArgumentException e =
assertThrows(
IllegalArgumentException.class, () ->
KafkaConnectUtils.debeziumRecordInstant(record));
- assertThat(e.getMessage(), Matchers.containsString("Should be STRUCT with
ts_ms field"));
+ assertThat(
+ e.getMessage(),
+ Matchers.containsString("Should be STRUCT with ts_ms field or
sourceOffset with ts_usec"));
+ }
+
+ @Test
+ public void testDebeziumRecordInstantNullValueSchema() {
+ SourceRecord record =
+ new SourceRecord(
+ Collections.singletonMap("server", "test"),
+ Collections.singletonMap("ts_usec", 1614854400000000L),
+ "test-topic",
+ null,
+ null);
+
+ Instant instant = KafkaConnectUtils.debeziumRecordInstant(record);
+ assertThat(instant.getMillis(), Matchers.is(1614854400000L));
+ }
+
+ @Test
+ public void testDebeziumRecordInstantMissingTimestamp() {
+ SourceRecord record =
+ new SourceRecord(
+ Collections.singletonMap("server", "test"),
+ Collections.emptyMap(),
+ "test-topic",
+ null,
+ null);
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class, () ->
KafkaConnectUtils.debeziumRecordInstant(record));
+ assertThat(
+ e.getMessage(),
+ Matchers.containsString("Should be STRUCT with ts_ms field or
sourceOffset with ts_usec"));
}
}