This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 25d0401d69029f76871b26f9fbd1f5c79de39bb7 Author: Etienne Chauchot <[email protected]> AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static <T> Encoder<T> fromBeamCoder(Coder<T> beamCoder) { List<Expression> serialiserList = new ArrayList<>(); - Class<T> claz = (Class<T>) Object.class; + Class<? super T> claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag<T> classTag = ClassTag$.MODULE$.apply(claz);
