This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 668227b5240e08ff0de45ad7deeff1dad0a6a97d Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Wed Sep 4 15:38:41 2019 +0200 Fix ExpressionEncoder generated code: typos, try catch, fqcn --- .../translation/helpers/EncoderHelpers.java | 38 +++++++++++++--------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 1d89101..dff308a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -144,18 +144,22 @@ public class EncoderHelpers { /* CODE GENERATED - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final bytes[] output; - if ({input.isNull}) - output = null; - else - output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + try { + java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + final byte[] output; + if ({input.isNull}) + output = null; + else + output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List<String> parts = new ArrayList<>(); - parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); parts.add(") output = null; else output ="); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();"); + parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -258,21 +262,25 @@ public class EncoderHelpers { /* CODE GENERATED: - final $javaType output = - ${input.isNull} ? - ${CodeGenerator.defaultValue(dataType)} : - ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value})); + try { + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(dataType)} : + ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); + } catch (IOException e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List<String> parts = new ArrayList<>(); - parts.add("final "); + parts.add("try { final "); parts.add(" output ="); parts.add("?"); parts.add(":"); parts.add("("); parts.add(") "); - parts.add(".decode(new ByteArrayInputStream("); - parts.add("));"); + parts.add(".decode(new java.io.ByteArrayInputStream("); + parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());