Repository: spark Updated Branches: refs/heads/master 434319e73 -> 08e315f63
[SPARK-24887][SQL] Avro: use SerializableConfiguration in Spark utils to deduplicate code ## What changes were proposed in this pull request? To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors. Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code. ## How was this patch tested? Unit test Author: Gengliang Wang <gengliang.w...@databricks.com> Closes #21846 from gengliangwang/removeSerializableConfiguration. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e315f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e315f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e315f6 Branch: refs/heads/master Commit: 08e315f6330984b757f241079dfc9e1028e5cd0a Parents: 434319e Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Mon Jul 23 08:31:48 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Mon Jul 23 08:31:48 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/avro/AvroFileFormat.scala | 44 +---------------- .../avro/SerializableConfigurationSuite.scala | 50 -------------------- 2 files changed, 2 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/08e315f6/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 078efab..b043252 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -23,8 +23,6 @@ import java.util.zip.Deflater import scala.util.control.NonFatal -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} -import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.avro.Schema import org.apache.avro.file.{DataFileConstants, DataFileReader} import org.apache.avro.generic.{GenericDatumReader, GenericRecord} @@ -41,6 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { private val log = LoggerFactory.getLogger(getClass) @@ -157,7 +156,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val broadcastedConf = - spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf)) + spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val parsedOptions = new AvroOptions(options, hadoopConf) (file: PartitionedFile) => { @@ -233,43 +232,4 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { private[avro] object AvroFileFormat { val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension" - - class SerializableConfiguration(@transient var value: Configuration) - extends Serializable with KryoSerializable { - @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass) - - private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { - out.defaultWriteObject() - value.write(out) - } - - private def readObject(in: ObjectInputStream): Unit = tryOrIOException { - value = new Configuration(false) - value.readFields(in) - } - - private def tryOrIOException[T](block: => T): T = { - try { - block - } catch { - case e: IOException => - log.error("Exception encountered", e) - throw e - case NonFatal(e) => - log.error("Exception encountered", e) - throw new IOException(e) - } - } - - def write(kryo: Kryo, out: Output): Unit = { - val dos = new DataOutputStream(out) - value.write(dos) - dos.flush() - } - - def read(kryo: Kryo, in: Input): Unit = { - value = new Configuration(false) - value.readFields(new DataInputStream(in)) - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/08e315f6/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala deleted file mode 100755 index a0f8851..0000000 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance} - -class SerializableConfigurationSuite extends SparkFunSuite { - - private def testSerialization(serializer: SerializerInstance): Unit = { - import AvroFileFormat.SerializableConfiguration - val conf = new SerializableConfiguration(new Configuration()) - - val serialized = serializer.serialize(conf) - - serializer.deserialize[Any](serialized) match { - case c: SerializableConfiguration => - assert(c.log != null, "log was null") - assert(c.value != null, "value was null") - case other => fail( - s"Expecting ${classOf[SerializableConfiguration]}, but got ${other.getClass}.") - } - } - - test("serialization with JavaSerializer") { - testSerialization(new JavaSerializer(new SparkConf()).newInstance()) - } - - test("serialization with KryoSerializer") { - testSerialization(new KryoSerializer(new SparkConf()).newInstance()) - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org