Repository: kafka Updated Branches: refs/heads/trunk a1900ada7 -> 6c839395b
KAFKA-4709ï¼Error message from Struct.validate() should include the name of the offending field. https://issues.apache.org/jira/browse/KAFKA-4709 Author: Aegeaner <[email protected]> Reviewers: Dong Lin, Guozhang Wang Closes #2521 from Aegeaner/KAFKA-4709 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6c839395 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6c839395 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6c839395 Branch: refs/heads/trunk Commit: 6c839395b7b43d8e9c0dfbb3c12470fc9284a94a Parents: a1900ad Author: Aegeaner <[email protected]> Authored: Thu Feb 16 13:44:08 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Feb 16 13:44:08 2017 -0800 ---------------------------------------------------------------------- .../kafka/connect/data/ConnectSchema.java | 15 +++- .../org/apache/kafka/connect/data/Struct.java | 2 +- .../apache/kafka/connect/data/FakeSchema.java | 83 ++++++++++++++++++++ .../apache/kafka/connect/data/StructTest.java | 33 ++++++++ 4 files changed, 129 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6c839395/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index d1fd9cd..e052534 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -207,9 +207,14 @@ public class ConnectSchema implements Schema { * @param value value to test */ public static void validateValue(Schema schema, Object value) { + validateValue(null, schema, value); + } + + public static void validateValue(String name, Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field"); + throw new DataException("Invalid value: null used for required field: \"" + name + + "\", schema type: " + schema.type()); else return; } @@ -220,7 +225,9 @@ public class ConnectSchema implements Schema { expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); if (expectedClasses == null) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for field: \"" + name + "\""); boolean foundMatch = false; for (Class<?> expectedClass : expectedClasses) { @@ -230,7 +237,9 @@ public class ConnectSchema implements Schema { } } if (!foundMatch) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for field: \"" + name + "\""); switch (schema.type()) { case STRUCT: http://git-wip-us.apache.org/repos/asf/kafka/blob/6c839395/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 698c6ee..200a1c0 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -229,7 +229,7 @@ public class Struct { Object value = values[field.index()]; if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) continue; - ConnectSchema.validateValue(fieldSchema, value); + ConnectSchema.validateValue(field.name(), fieldSchema, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6c839395/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java new file mode 100644 index 0000000..ff2e24f --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import java.util.List; +import java.util.Map; + +public class FakeSchema implements Schema { + @Override + public Type type() { + return null; + } + + @Override + public boolean isOptional() { + return false; + } + + @Override + public Object defaultValue() { + return null; + } + + @Override + public String name() { + return "fake"; + } + + @Override + public Integer version() { + return null; + } + + @Override + public String doc() { + return null; + } + + @Override + public Map<String, String> parameters() { + return null; + } + + @Override + public Schema keySchema() { + return null; + } + + @Override + public Schema valueSchema() { + return null; + } + + @Override + public List<Field> fields() { + return null; + } + + @Override + public Field field(String fieldName) { + return null; + } + + @Override + public Schema schema() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6c839395/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 11c9fb0..82f6d89 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -18,7 +18,9 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -234,4 +236,35 @@ public class StructTest { assertEquals(struct1, struct2); assertNotEquals(struct1, struct3); } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testValidateStructWithNullValue() { + Schema schema = SchemaBuilder.struct() + .field("one", Schema.STRING_SCHEMA) + .field("two", Schema.STRING_SCHEMA) + .field("three", Schema.STRING_SCHEMA) + .build(); + + Struct struct = new Struct(schema); + thrown.expect(DataException.class); + thrown.expectMessage("Invalid value: null used for required field: \"one\", schema type: STRING"); + struct.validate(); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + FakeSchema fakeSchema = new FakeSchema(); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for field: \"field\""); + ConnectSchema.validateValue(fieldName, fakeSchema, new Object()); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\""); + ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); + } }
