This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 75ba599  KAFKA-7052 Avoiding NPE in ExtractField SMT in case of 
non-existent fields (#8059)
75ba599 is described below

commit 75ba599de50192c3a1d11ae9f1814fb2798ae6fe
Author: Gunnar Morling <[email protected]>
AuthorDate: Tue Feb 11 20:48:22 2020 +0100

    KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields 
(#8059)
    
    Author: Gunnar Morling <[email protected]>
    Reviewer: Randall Hauch <[email protected]>
---
 .../kafka/connect/transforms/ExtractField.java     |  9 +++++++-
 .../kafka/connect/transforms/ExtractFieldTest.java | 27 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index eb8c357..bd3cbd9 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -58,7 +59,13 @@ public abstract class ExtractField<R extends 
ConnectRecord<R>> implements Transf
             return newRecord(record, null, value == null ? null : 
value.get(fieldName));
         } else {
             final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
-            return newRecord(record, schema.field(fieldName).schema(), value 
== null ? null : value.get(fieldName));
+            Field field = schema.field(fieldName);
+
+            if (field == null) {
+                throw new IllegalArgumentException("Unknown field: " + 
fieldName);
+            }
+
+            return newRecord(record, field.schema(), value == null ? null : 
value.get(fieldName));
         }
     }
 
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index acb0beb..a78c6d5 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class ExtractFieldTest {
     private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
@@ -86,4 +87,30 @@ public class ExtractFieldTest {
         assertNull(transformedRecord.key());
     }
 
+    @Test
+    public void nonExistentFieldSchemalessShouldReturnNull() {
+        xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+        final SinkRecord record = new SinkRecord("test", 0, null, 
Collections.singletonMap("magic", 42), null, null, 0);
+        final SinkRecord transformedRecord = xform.apply(record);
+
+        assertNull(transformedRecord.keySchema());
+        assertNull(transformedRecord.key());
+    }
+
+    @Test
+    public void nonExistentFieldWithSchemaShouldFail() {
+        xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+        final Schema keySchema = SchemaBuilder.struct().field("magic", 
Schema.INT32_SCHEMA).build();
+        final Struct key = new Struct(keySchema).put("magic", 42);
+        final SinkRecord record = new SinkRecord("test", 0, keySchema, key, 
null, null, 0);
+
+        try {
+            xform.apply(record);
+            fail("Expected exception wasn't raised");
+        } catch (IllegalArgumentException iae) {
+            assertEquals("Unknown field: nonexistent", iae.getMessage());
+        }
+    }
 }

Reply via email to