This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new 75ba599 KAFKA-7052 Avoiding NPE in ExtractField SMT in case of
non-existent fields (#8059)
75ba599 is described below
commit 75ba599de50192c3a1d11ae9f1814fb2798ae6fe
Author: Gunnar Morling <[email protected]>
AuthorDate: Tue Feb 11 20:48:22 2020 +0100
KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields
(#8059)
Author: Gunnar Morling <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../kafka/connect/transforms/ExtractField.java | 9 +++++++-
.../kafka/connect/transforms/ExtractFieldTest.java | 27 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
index eb8c357..bd3cbd9 100644
---
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
+++
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -58,7 +59,13 @@ public abstract class ExtractField<R extends
ConnectRecord<R>> implements Transf
return newRecord(record, null, value == null ? null :
value.get(fieldName));
} else {
final Struct value = requireStructOrNull(operatingValue(record),
PURPOSE);
- return newRecord(record, schema.field(fieldName).schema(), value
== null ? null : value.get(fieldName));
+ Field field = schema.field(fieldName);
+
+ if (field == null) {
+ throw new IllegalArgumentException("Unknown field: " +
fieldName);
+ }
+
+ return newRecord(record, field.schema(), value == null ? null :
value.get(fieldName));
}
}
diff --git
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index acb0beb..a78c6d5 100644
---
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class ExtractFieldTest {
private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
@@ -86,4 +87,30 @@ public class ExtractFieldTest {
assertNull(transformedRecord.key());
}
+ @Test
+ public void nonExistentFieldSchemalessShouldReturnNull() {
+ xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+ final SinkRecord record = new SinkRecord("test", 0, null,
Collections.singletonMap("magic", 42), null, null, 0);
+ final SinkRecord transformedRecord = xform.apply(record);
+
+ assertNull(transformedRecord.keySchema());
+ assertNull(transformedRecord.key());
+ }
+
+ @Test
+ public void nonExistentFieldWithSchemaShouldFail() {
+ xform.configure(Collections.singletonMap("field", "nonexistent"));
+
+ final Schema keySchema = SchemaBuilder.struct().field("magic",
Schema.INT32_SCHEMA).build();
+ final Struct key = new Struct(keySchema).put("magic", 42);
+ final SinkRecord record = new SinkRecord("test", 0, keySchema, key,
null, null, 0);
+
+ try {
+ xform.apply(record);
+ fail("Expected exception wasn't raised");
+ } catch (IllegalArgumentException iae) {
+ assertEquals("Unknown field: nonexistent", iae.getMessage());
+ }
+ }
}