This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 668227b5240e08ff0de45ad7deeff1dad0a6a97d
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Wed Sep 4 15:38:41 2019 +0200

    Fix ExpressionEncoder generated code: typos, try catch, fqcn
---
 .../translation/helpers/EncoderHelpers.java        | 38 +++++++++++++---------
 1 file changed, 23 insertions(+), 15 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
index 1d89101..dff308a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java
@@ -144,18 +144,22 @@ public class EncoderHelpers {
 
       /*
         CODE GENERATED
-       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-       final bytes[] output;
-       if ({input.isNull})
-          output = null;
-       else
-          output = $beamCoder.encode(${input.value}, baos); baos.toByteArray();
+       try {
+        java.io.ByteArrayOutputStream baos = new 
java.io.ByteArrayOutputStream();
+        final byte[] output;
+        if ({input.isNull})
+            output = null;
+        else
+            output = $beamCoder.encode(${input.value}, baos); 
baos.toByteArray();
+        } catch (Exception e) {
+          throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+        }
       */
       List<String> parts = new ArrayList<>();
-      parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
final bytes[] output; if (");
+      parts.add("try {java.io.ByteArrayOutputStream baos = new 
java.io.ByteArrayOutputStream(); final byte[] output; if (");
       parts.add(") output = null; else output =");
       parts.add(".encode(");
-      parts.add(", baos); baos.toByteArray();");
+      parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
 
       StringContext sc = new 
StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
 
@@ -258,21 +262,25 @@ public class EncoderHelpers {
 
 /*
      CODE GENERATED:
-     final $javaType output =
-     ${input.isNull} ?
-     ${CodeGenerator.defaultValue(dataType)} :
-     ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value}));
+     try {
+      final $javaType output =
+      ${input.isNull} ?
+      ${CodeGenerator.defaultValue(dataType)} :
+      ($javaType) $beamCoder.decode(new 
java.io.ByteArrayInputStream(${input.value}));
+     } catch (IOException e) {
+      throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));
+     }
 */
 
       List<String> parts = new ArrayList<>();
-      parts.add("final ");
+      parts.add("try { final ");
       parts.add(" output =");
       parts.add("?");
       parts.add(":");
       parts.add("(");
       parts.add(") ");
-      parts.add(".decode(new ByteArrayInputStream(");
-      parts.add("));");
+      parts.add(".decode(new java.io.ByteArrayInputStream(");
+      parts.add("));  } catch (IOException e) {throw new 
RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}");
 
       StringContext sc = new 
StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
 

Reply via email to