Repository: samza
Updated Branches:
  refs/heads/master 93b397e84 -> 7055ce670


SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords.

Author: Aditya Toomula <atoom...@linkedin.com>

Reviewers: Srini P<spun...@linkedin.com>

Closes #558 from atoomula/sql1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7055ce67
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7055ce67
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7055ce67

Branch: refs/heads/master
Commit: 7055ce670a270a14c1833ee0bd05fbd74cb93911
Parents: 93b397e
Author: Aditya Toomula <atoom...@linkedin.com>
Authored: Fri Jul 13 15:26:34 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Jul 13 15:26:34 2018 -0700

----------------------------------------------------------------------
 .../samza/tools/json/JsonRelConverterFactory.java     | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7055ce67/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
index ce257f1..4db066a 100644
--- 
a/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -55,12 +56,16 @@ public class JsonRelConverterFactory implements 
SamzaRelConverterFactory {
 
     @Override
     public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage 
relMessage) {
+      String jsonValue = 
convertToSamzaMessage(relMessage.getSamzaSqlRelRecord());
+      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+    }
 
+    private String convertToSamzaMessage(SamzaSqlRelRecord relRecord) {
       String jsonValue;
       ObjectNode node = mapper.createObjectNode();
 
-      List<String> fieldNames = 
relMessage.getSamzaSqlRelRecord().getFieldNames();
-      List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+      List<String> fieldNames = relRecord.getFieldNames();
+      List<Object> values = relRecord.getFieldValues();
 
       for (int index = 0; index < fieldNames.size(); index++) {
         Object value = values.get(index);
@@ -77,6 +82,9 @@ public class JsonRelConverterFactory implements 
SamzaRelConverterFactory {
           node.put(fieldNames.get(index), (Double) value);
         } else if (String.class.isAssignableFrom(value.getClass())) {
           node.put(fieldNames.get(index), (String) value);
+        } else if (SamzaSqlRelRecord.class.isAssignableFrom(value.getClass())) 
{
+          // If the value is a SamzaSqlRelRecord, call convertToSamzaMessage 
to convert the record to json string.
+          node.put(fieldNames.get(index), 
convertToSamzaMessage((SamzaSqlRelRecord) value));
         } else {
           node.put(fieldNames.get(index), value.toString());
         }
@@ -87,7 +95,7 @@ public class JsonRelConverterFactory implements 
SamzaRelConverterFactory {
         throw new SamzaException("Error json serializing object", e);
       }
 
-      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+      return jsonValue;
     }
   }
 }

Reply via email to