This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e6fcdd72ed6 [Java][Debezium] Fix NPE in debeziumRecordInstant for 
DELETE events (#37795)
e6fcdd72ed6 is described below

commit e6fcdd72ed621a01570775d4d5acd3b7845e2c60
Author: liferoad <[email protected]>
AuthorDate: Tue Mar 10 20:41:14 2026 -0400

    [Java][Debezium] Fix NPE in debeziumRecordInstant for DELETE events (#37795)
    
    * Fix #37738: handle Debezium DELETE records without valueSchema
    
    * refactor: replace fully qualified class names with imports in 
KafkaConnectSchemaTest.
---
 .../apache/beam/io/debezium/KafkaConnectUtils.java | 27 +++++++++-----
 .../beam/io/debezium/KafkaConnectSchemaTest.java   | 41 ++++++++++++++++++++--
 2 files changed, 58 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
index 7b97b11f0ec..12f43443782 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
@@ -80,15 +80,26 @@ public class KafkaConnectUtils {
   }
 
   public static Instant debeziumRecordInstant(SourceRecord record) {
-    if 
(!record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
-        || record.valueSchema().field("ts_ms") == null) {
-      throw new IllegalArgumentException(
-          "Debezium record received is not of the right kind. "
-              + String.format(
-                  "Should be STRUCT with ts_ms field. Instead it is: %s", 
record.valueSchema()));
+    if (record.valueSchema() != null
+        && 
record.valueSchema().type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
+        && record.valueSchema().field("ts_ms") != null
+        && record.value() != null) {
+      Struct recordValue = (Struct) record.value();
+      return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
     }
-    Struct recordValue = (Struct) record.value();
-    return Instant.ofEpochMilli(recordValue.getInt64("ts_ms"));
+
+    if (record.sourceOffset() != null && 
record.sourceOffset().containsKey("ts_usec")) {
+      Object tsUsecValue = record.sourceOffset().get("ts_usec");
+      if (tsUsecValue instanceof Number) {
+        return Instant.ofEpochMilli(((Number) tsUsecValue).longValue() / 1000);
+      }
+    }
+
+    throw new IllegalArgumentException(
+        "Debezium record received is not of the right kind. "
+            + String.format(
+                "Should be STRUCT with ts_ms field or sourceOffset with 
ts_usec. Instead it is: %s, %s",
+                record.valueSchema(), record.sourceOffset()));
   }
 
   public static SourceRecordMapper<Row> beamRowFromSourceRecordFn(final Schema 
recordSchema) {
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
index 95d9aeb9819..aed4e5caf16 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
@@ -20,8 +20,11 @@ package org.apache.beam.io.debezium;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 
+import java.util.Collections;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.hamcrest.Matchers;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -59,11 +62,45 @@ public class KafkaConnectSchemaTest {
 
   @Test
   public void testTimestampRequired() {
-    org.apache.kafka.connect.source.SourceRecord record = 
SourceRecordJsonTest.buildSourceRecord();
+    SourceRecord record = SourceRecordJsonTest.buildSourceRecord();
 
     IllegalArgumentException e =
         assertThrows(
             IllegalArgumentException.class, () -> 
KafkaConnectUtils.debeziumRecordInstant(record));
-    assertThat(e.getMessage(), Matchers.containsString("Should be STRUCT with 
ts_ms field"));
+    assertThat(
+        e.getMessage(),
+        Matchers.containsString("Should be STRUCT with ts_ms field or 
sourceOffset with ts_usec"));
+  }
+
+  @Test
+  public void testDebeziumRecordInstantNullValueSchema() {
+    SourceRecord record =
+        new SourceRecord(
+            Collections.singletonMap("server", "test"),
+            Collections.singletonMap("ts_usec", 1614854400000000L),
+            "test-topic",
+            null,
+            null);
+
+    Instant instant = KafkaConnectUtils.debeziumRecordInstant(record);
+    assertThat(instant.getMillis(), Matchers.is(1614854400000L));
+  }
+
+  @Test
+  public void testDebeziumRecordInstantMissingTimestamp() {
+    SourceRecord record =
+        new SourceRecord(
+            Collections.singletonMap("server", "test"),
+            Collections.emptyMap(),
+            "test-topic",
+            null,
+            null);
+
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
KafkaConnectUtils.debeziumRecordInstant(record));
+    assertThat(
+        e.getMessage(),
+        Matchers.containsString("Should be STRUCT with ts_ms field or 
sourceOffset with ts_usec"));
   }
 }

Reply via email to