Repository: samza Updated Branches: refs/heads/master 93b397e84 -> 7055ce670
SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords. Author: Aditya Toomula <atoom...@linkedin.com> Reviewers: Srini P<spun...@linkedin.com> Closes #558 from atoomula/sql1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7055ce67 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7055ce67 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7055ce67 Branch: refs/heads/master Commit: 7055ce670a270a14c1833ee0bd05fbd74cb93911 Parents: 93b397e Author: Aditya Toomula <atoom...@linkedin.com> Authored: Fri Jul 13 15:26:34 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Jul 13 15:26:34 2018 -0700 ---------------------------------------------------------------------- .../samza/tools/json/JsonRelConverterFactory.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7055ce67/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java index ce257f1..4db066a 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java @@ -25,6 +25,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; +import org.apache.samza.sql.SamzaSqlRelRecord; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.RelSchemaProvider; import org.apache.samza.sql.interfaces.SamzaRelConverter; @@ -55,12 +56,16 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory { @Override public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) { + String jsonValue = convertToSamzaMessage(relMessage.getSamzaSqlRelRecord()); + return new KV<>(relMessage.getKey(), jsonValue.getBytes()); + } + private String convertToSamzaMessage(SamzaSqlRelRecord relRecord) { String jsonValue; ObjectNode node = mapper.createObjectNode(); - List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames(); - List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues(); + List<String> fieldNames = relRecord.getFieldNames(); + List<Object> values = relRecord.getFieldValues(); for (int index = 0; index < fieldNames.size(); index++) { Object value = values.get(index); @@ -77,6 +82,9 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory { node.put(fieldNames.get(index), (Double) value); } else if (String.class.isAssignableFrom(value.getClass())) { node.put(fieldNames.get(index), (String) value); + } else if (SamzaSqlRelRecord.class.isAssignableFrom(value.getClass())) { + // If the value is a SamzaSqlRelRecord, call convertToSamzaMessage to convert the record to json string. + node.put(fieldNames.get(index), convertToSamzaMessage((SamzaSqlRelRecord) value)); } else { node.put(fieldNames.get(index), value.toString()); } @@ -87,7 +95,7 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory { throw new SamzaException("Error json serializing object", e); } - return new KV<>(relMessage.getKey(), jsonValue.getBytes()); + return jsonValue; } } }