This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 1a2c233  [SPARK-31327][SQL][2.4] Write Spark version into Avro file 
metadata
1a2c233 is described below

commit 1a2c23306335321b23b6b4bc07f5fda105fdcdf6
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Apr 8 08:27:06 2020 -0700

    [SPARK-31327][SQL][2.4] Write Spark version into Avro file metadata
    
    Backport 
https://github.com/apache/spark/commit/6b1ca886c0066f4e10534336f3fce64cdebc79a5,
 similar to https://github.com/apache/spark/pull/28142
    
    ### What changes were proposed in this pull request?
    
    Write Spark version into Avro file metadata
    
    ### Why are the changes needed?
    
    The version info is very useful for backward compatibility. This is also 
done in parquet/orc.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    Closes #28150 from cloud-fan/pick.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/avro/SparkAvroKeyOutputFormat.java   | 94 ++++++++++++++++++++++
 .../apache/spark/sql/avro/AvroOutputWriter.scala   | 12 ++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 14 +++-
 .../main/scala/org/apache/spark/sql/package.scala  |  1 +
 4 files changed, 116 insertions(+), 5 deletions(-)

diff --git 
a/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
 
b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
new file mode 100644
index 0000000..55696a6
--- /dev/null
+++ 
b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.Syncable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+// A variant of `AvroKeyOutputFormat`, which is used to inject the custom 
`RecordWriterFactory` so
+// that we can set avro file metadata.
+public class SparkAvroKeyOutputFormat extends 
AvroKeyOutputFormat<GenericRecord> {
+  public SparkAvroKeyOutputFormat(Map<String, String> metadata) {
+    super(new SparkRecordWriterFactory(metadata));
+  }
+
+  static class SparkRecordWriterFactory extends 
RecordWriterFactory<GenericRecord> {
+    private final Map<String, String> metadata;
+    SparkRecordWriterFactory(Map<String, String> metadata) {
+      this.metadata = metadata;
+    }
+
+    protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
+        Schema writerSchema,
+        GenericData dataModel,
+        CodecFactory compressionCodec,
+        OutputStream outputStream,
+        int syncInterval) throws IOException {
+      return new SparkAvroKeyRecordWriter(
+        writerSchema, dataModel, compressionCodec, outputStream, syncInterval, 
metadata);
+    }
+  }
+}
+
+// This a fork of org.apache.avro.mapreduce.AvroKeyRecordWriter, in order to 
set file metadata.
+class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, 
NullWritable>
+    implements Syncable {
+
+  private final DataFileWriter<T> mAvroFileWriter;
+
+  SparkAvroKeyRecordWriter(
+      Schema writerSchema,
+      GenericData dataModel,
+      CodecFactory compressionCodec,
+      OutputStream outputStream,
+      int syncInterval,
+      Map<String, String> metadata) throws IOException {
+    this.mAvroFileWriter = new 
DataFileWriter(dataModel.createDatumWriter(writerSchema));
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
+    }
+    this.mAvroFileWriter.setCodec(compressionCodec);
+    this.mAvroFileWriter.setSyncInterval(syncInterval);
+    this.mAvroFileWriter.create(writerSchema, outputStream);
+  }
+
+  public void write(AvroKey<T> record, NullWritable ignore) throws IOException 
{
+    this.mAvroFileWriter.append(record.datum());
+  }
+
+  public void close(TaskAttemptContext context) throws IOException {
+    this.mAvroFileWriter.close();
+  }
+
+  public long sync() throws IOException {
+    return this.mAvroFileWriter.sync();
+  }
+}
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 0650711..2cfa3a4 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -19,14 +19,17 @@ package org.apache.spark.sql.avro
 
 import java.io.{IOException, OutputStream}
 
+import scala.collection.JavaConverters._
+
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.mapred.AvroKey
-import org.apache.avro.mapreduce.AvroKeyOutputFormat
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
 
+import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
 import org.apache.spark.sql.types._
@@ -45,8 +48,9 @@ private[avro] class AvroOutputWriter(
    * Overrides the couple of methods responsible for generating the output 
streams / files so
    * that the data can be correctly partitioned
    */
-  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] 
=
-    new AvroKeyOutputFormat[GenericRecord]() {
+  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] 
= {
+    val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> 
SPARK_VERSION_SHORT).asJava
+    new SparkAvroKeyOutputFormat(sparkVersion) {
 
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         new Path(path)
@@ -57,8 +61,8 @@ private[avro] class AvroOutputWriter(
         val path = getDefaultWorkFile(context, ".avro")
         path.getFileSystem(context.getConfiguration).create(path)
       }
-
     }.getRecordWriter(context)
+  }
 
   override def write(row: InternalRow): Unit = {
     val key = new 
AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord])
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d8e5297..d7317fc 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, 
GenericDatumReader, GenericDatumWri
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
@@ -1357,4 +1357,16 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
       |}
     """.stripMargin)
   }
+
+  test("SPARK-31327: Write Spark version into Avro file metadata") {
+    withTempPath { path =>
+      
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
+      val avroFiles = path.listFiles()
+        .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+      assert(avroFiles.length === 1)
+      val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+      val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
+      assert(version === SPARK_VERSION_SHORT)
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 354660e9..23af990 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -49,6 +49,7 @@ package object sql {
    * Metadata key which is used to write Spark version in the followings:
    * - Parquet file metadata
    * - ORC file metadata
+   * - Avro file metadata
    *
    * Note that Hive table property `spark.sql.create.version` also has Spark 
version.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to