Repository: kafka Updated Branches: refs/heads/trunk b905d4891 -> 267952460
KAFKA-3055; Fix JsonConverter mangling the Schema in Connect Author: ksenji <[email protected]> Reviewers: Dong Lin <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #722 from ksenji/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/26795246 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/26795246 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/26795246 Branch: refs/heads/trunk Commit: 2679524604b611046e9826b2a1fba461d42f06f4 Parents: b905d48 Author: Kishore Senji <[email protected]> Authored: Mon Jan 4 11:47:31 2016 -0500 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jan 4 11:47:31 2016 -0500 ---------------------------------------------------------------------- .../java/org/apache/kafka/connect/json/JsonConverter.java | 2 +- .../org/apache/kafka/connect/json/JsonConverterTest.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/26795246/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index ed295e1..a70cadd 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -404,7 +404,7 @@ public class JsonConverter implements Converter { jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME); ArrayNode fields = JsonNodeFactory.instance.arrayNode(); for (Field field : schema.fields()) { - ObjectNode fieldJsonSchema = asJsonSchema(field.schema()); + ObjectNode fieldJsonSchema = asJsonSchema(field.schema()).deepCopy(); fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name()); fields.add(fieldJsonSchema); } http://git-wip-us.apache.org/repos/asf/kafka/blob/26795246/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index e56b009..c923285 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -431,15 +431,17 @@ public class JsonConverterTest { @Test public void structToJson() { - Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); - Struct input = new Struct(schema).put("field1", true).put("field2", "string"); + Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).field("field3", Schema.STRING_SCHEMA).field("field4", Schema.BOOLEAN_SCHEMA).build(); + Struct input = new Struct(schema).put("field1", true).put("field2", "string2").put("field3", "string3").put("field4", false); JsonNode converted = parse(converter.fromConnectData(TOPIC, schema, input)); validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"), + assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }, { \"field\": \"field3\", \"type\": \"string\", \"optional\": false }, { \"field\": \"field4\", \"type\": \"boolean\", \"optional\": false }] }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(JsonNodeFactory.instance.objectNode() .put("field1", true) - .put("field2", "string"), + .put("field2", "string2") + .put("field3", "string3") + .put("field4", false), converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); }
