Repository: spark Updated Branches: refs/heads/master 2333a34d3 -> 00b864aa7
[SPARK-24876][SQL] Avro: simplify schema serialization ## What changes were proposed in this pull request? Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema: https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37 On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler. ## How was this patch tested? Unit test Author: Gengliang Wang <gengliang.w...@databricks.com> Closes #21829 from gengliangwang/removeSerializableSchema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00b864aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00b864aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00b864aa Branch: refs/heads/master Commit: 00b864aa7054a34f3d7a118d92eae0b3c28b86e5 Parents: 2333a34 Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Fri Jul 20 14:57:59 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Fri Jul 20 14:57:59 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/avro/AvroFileFormat.scala | 2 +- .../sql/avro/AvroOutputWriterFactory.scala | 14 +++- .../spark/sql/avro/SerializableSchema.scala | 69 -------------------- .../sql/avro/SerializableSchemaSuite.scala | 56 ---------------- 4 files changed, 12 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 1d0f40e..780e457 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -146,7 +146,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { log.error(s"unsupported compression codec $unknown") } - new AvroOutputWriterFactory(dataSchema, new SerializableSchema(outputAvroSchema)) + new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString) } override def buildReader( http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala index 18a6d93..116020e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala @@ -17,14 +17,22 @@ package org.apache.spark.sql.avro +import org.apache.avro.Schema import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType +/** + * A factory that produces [[AvroOutputWriter]]. + * @param catalystSchema Catalyst schema of input data. + * @param avroSchemaAsJsonString Avro schema of output result, in JSON string format. + */ private[avro] class AvroOutputWriterFactory( - schema: StructType, - avroSchema: SerializableSchema) extends OutputWriterFactory { + catalystSchema: StructType, + avroSchemaAsJsonString: String) extends OutputWriterFactory { + + private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) override def getFileExtension(context: TaskAttemptContext): String = ".avro" @@ -32,6 +40,6 @@ private[avro] class AvroOutputWriterFactory( path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new AvroOutputWriter(path, context, schema, avroSchema.value) + new AvroOutputWriter(path, context, catalystSchema, avroSchema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala deleted file mode 100644 index ec0ddc7..0000000 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import java.io._ - -import scala.util.control.NonFatal - -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} -import com.esotericsoftware.kryo.io.{Input, Output} -import org.apache.avro.Schema -import org.slf4j.LoggerFactory - -class SerializableSchema(@transient var value: Schema) - extends Serializable with KryoSerializable { - - @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass) - - private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { - out.defaultWriteObject() - out.writeUTF(value.toString()) - out.flush() - } - - private def readObject(in: ObjectInputStream): Unit = tryOrIOException { - val json = in.readUTF() - value = new Schema.Parser().parse(json) - } - - private def tryOrIOException[T](block: => T): T = { - try { - block - } catch { - case e: IOException => - log.error("Exception encountered", e) - throw e - case NonFatal(e) => - log.error("Exception encountered", e) - throw new IOException(e) - } - } - - def write(kryo: Kryo, out: Output): Unit = { - val dos = new DataOutputStream(out) - dos.writeUTF(value.toString()) - dos.flush() - } - - def read(kryo: Kryo, in: Input): Unit = { - val dis = new DataInputStream(in) - val json = dis.readUTF() - value = new Schema.Parser().parse(json) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/00b864aa/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala deleted file mode 100644 index 510bcbd..0000000 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import org.apache.avro.Schema - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance} - -class SerializableSchemaSuite extends SparkFunSuite { - - private def testSerialization(serializer: SerializerInstance): Unit = { - val avroTypeJson = - s""" - |{ - | "type": "string", - | "name": "my_string" - |} - """.stripMargin - val avroSchema = new Schema.Parser().parse(avroTypeJson) - val serializableSchema = new SerializableSchema(avroSchema) - val serialized = serializer.serialize(serializableSchema) - - serializer.deserialize[Any](serialized) match { - case c: SerializableSchema => - assert(c.log != null, "log was null") - assert(c.value != null, "value was null") - assert(c.value == avroSchema) - case other => fail( - s"Expecting ${classOf[SerializableSchema]}, but got ${other.getClass}.") - } - } - - test("serialization with JavaSerializer") { - testSerialization(new JavaSerializer(new SparkConf()).newInstance()) - } - - test("serialization with KryoSerializer") { - testSerialization(new KryoSerializer(new SparkConf()).newInstance()) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org