Repository: kafka Updated Branches: refs/heads/trunk 75e213e55 -> 54bf2fb5f
KAFKA-4810: Make Kafka Connect SchemaBuilder more lax about checking that fields are unset https://issues.apache.org/jira/browse/KAFKA-4810 > Currently SchemaBuilder is strict when checking that certain fields have not > been set yet (e.g. version, name, doc). It just checks that the field is > null. This is intended to protect the user from buggy code that overwrites a > field with different values, but it's a bit too strict currently. In generic > code for converting schemas (e.g. Converters) you will sometimes initialize a > builder with these values (e.g. because you get a SchemaBuilder for a logical > type, which sets name & version), but then have generic code for setting name > & version from the source schema. Changed the validation method to not only check if a field is null but also to check if the new value that is being set is the same as the current value of the field. ewencp Author: Vitaly Pushkar <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #2806 from vitaly-pushkar/KAFKA-4810-schema-builder-default-fields-validation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54bf2fb5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54bf2fb5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54bf2fb5 Branch: refs/heads/trunk Commit: 54bf2fb5ff8f1841aea2df2e6fa76f8a05fddfba Parents: 75e213e Author: Vitaly Pushkar <[email protected]> Authored: Tue Apr 4 14:58:45 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Apr 4 14:58:45 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/connect/data/SchemaBuilder.java | 19 +++++++++---------- .../kafka/connect/data/SchemaBuilderTest.java | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/54bf2fb5/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index 5a2b693..f0c4586 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -94,7 +94,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder optional() { - checkNull(OPTIONAL_FIELD, optional); + checkCanSet(OPTIONAL_FIELD, optional, true); optional = true; return this; } @@ -104,7 +104,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder required() { - checkNull(OPTIONAL_FIELD, optional); + checkCanSet(OPTIONAL_FIELD, optional, false); optional = false; return this; } @@ -121,7 +121,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder defaultValue(Object value) { - checkNull(DEFAULT_FIELD, defaultValue); + checkCanSet(DEFAULT_FIELD, defaultValue, value); checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD); try { ConnectSchema.validateValue(this, value); @@ -143,7 +143,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder name(String name) { - checkNull(NAME_FIELD, this.name); + checkCanSet(NAME_FIELD, this.name, name); this.name = name; return this; } @@ -160,7 +160,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder version(Integer version) { - checkNull(VERSION_FIELD, this.version); + checkCanSet(VERSION_FIELD, this.version, version); this.version = version; return this; } @@ -176,7 +176,7 @@ public class SchemaBuilder implements Schema { * @return the SchemaBuilder */ public SchemaBuilder doc(String doc) { - checkNull(DOC_FIELD, this.doc); + checkCanSet(DOC_FIELD, this.doc, doc); this.doc = doc; return this; } @@ -398,9 +398,8 @@ public class SchemaBuilder implements Schema { return build(); } - - private static void checkNull(String fieldName, Object val) { - if (val != null) + private static void checkCanSet(String fieldName, Object fieldVal, Object val) { + if (fieldVal != null && fieldVal != val) throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set."); } @@ -408,4 +407,4 @@ public class SchemaBuilder implements Schema { if (val == null) throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/54bf2fb5/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java index 6162420..0dba0ad 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java @@ -305,6 +305,24 @@ public class SchemaBuilderTest { struct.validate(); } + @Test + public void testDefaultFieldsSameValueOverwriting() { + final SchemaBuilder schemaBuilder = SchemaBuilder.string().name("testing").version(123); + + schemaBuilder.name("testing"); + schemaBuilder.version(123); + + assertEquals("testing", schemaBuilder.name()); + } + + @Test(expected = SchemaBuilderException.class) + public void testDefaultFieldsDifferentValueOverwriting() { + final SchemaBuilder schemaBuilder = SchemaBuilder.string().name("testing").version(123); + + schemaBuilder.name("testing"); + schemaBuilder.version(456); + } + private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) { assertEquals(type, schema.type()); assertEquals(optional, schema.isOptional());
