This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit ca01777b5bd593c7caa5a6be6136abe662b8a4e5 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Thu Sep 5 14:33:23 2019 +0200 Remove example code --- .../translation/helpers/EncoderHelpers.java | 69 ---------------------- 1 file changed, 69 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 3f7c102..83243b3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -102,22 +102,6 @@ public class EncoderHelpers { JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); - -/* - ExpressionEncoder[T]( - schema = new StructType().add("value", BinaryType), - flat = true, - serializer = Seq( - EncodeUsingSerializer( - BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), - deserializer = - DecodeUsingSerializer[T]( - Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), - classTag[T], - kryo = useKryo), - clsTag = classTag[T] - ) -*/ } public static class EncodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression { @@ -219,30 +203,6 @@ public class EncoderHelpers { } } - /*case class EncodeUsingSerializer(child: Expression, kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - - override def nullSafeEval(input: Any): Any = { - serializerInstance.serialize(input).array() - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val serializer = addImmutableScodererializerIfNeeded(ctx) - // Code to serialize. - val input = child.genCode(ctx) - val javaType = CodeGenerator.javaType(dataType) - val serialize = s"$serializer.serialize(${input.value}, null).array()" - - val code = input.code + code""" - final $javaType ${ev.value} = - ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize; - """ - ev.copy(code = code, isNull = input.isNull) - } - - override def dataType: DataType = BinaryType - }*/ - public static class DecodeUsingBeamCoder<T> extends UnaryExpression implements NonSQLExpression{ private Expression child; @@ -353,33 +313,4 @@ public class EncoderHelpers { return Objects.hash(super.hashCode(), classTag, beamCoder); } } -/* -case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - - override def nullSafeEval(input: Any): Any = { - val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]]) - serializerInstance.deserialize(inputBytes) - } - - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val serializer = addImmutableSerializerIfNeeded(ctx) - // Code to deserialize. - val input = child.genCode(ctx) - val javaType = CodeGenerator.javaType(dataType) - val deserialize = - s"($javaType) $serializer.deserialize(java.nio.ByteBuffer.wrap(${input.value}), null)" - - val code = input.code + code""" - final $javaType ${ev.value} = - ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize; - """ - ev.copy(code = code, isNull = input.isNull) - } - - override def dataType: DataType = ObjectType(tag.runtimeClass) - } -*/ - - }