Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203796377 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream + +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class CatalystDataToAvro(child: Expression) extends UnaryExpression { + + override def dataType: DataType = BinaryType + + @transient private lazy val avroType = + SchemaConverters.toAvroType(child.dataType, child.nullable) + + @transient private lazy val serializer = + new AvroSerializer(child.dataType, avroType, child.nullable) + + @transient private lazy val writer = + new GenericDatumWriter[Any](avroType) + + @transient private var encoder: BinaryEncoder = _ + + @transient private lazy val out = new ByteArrayOutputStream + + override def nullSafeEval(input: Any): Any = { + out.reset() + encoder = EncoderFactory.get().directBinaryEncoder(out, encoder) + val avroData = serializer.serialize(input) + writer.write(avroData, encoder) + encoder.flush() + out.toByteArray + } + + override def simpleString: String = { + s"to_avro(${child.sql}, ${child.dataType.simpleString})" + } + + override def sql: String = simpleString + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { --- End diff -- I tried complicated code like ``` nullSafeCodeGen(ctx, ev, binary => { val avroReader = ctx.addReferenceObj("reader", reader) val avroDeserializer = ctx.addReferenceObj("deserializer", deserializer) val avroDecoder = ctx.addMutableState(classOf[BinaryDecoder].getName, "avroDecoder") val catalystResult = ctx.addMutableState(classOf[Any].getName, "catalystResult") val decoderFactory = classOf[DecoderFactory].getName s""" $avroDecoder = $decoderFactory.get().binaryDecoder($binary, $avroDecoder); try { $catalystResult = $avroReader.read($catalystResult, $avroDecoder); } catch (java.io.IOException e) { org.apache.spark.unsafe.Platform.throwException(e); } ${ev.value} = (${CodeGenerator.boxedType(dataType)}) $avroDeserializer.deserialize($catalystResult); """ }) ``` But eventually find it hard to serialize some objects.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org