This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9a2f39318e3a [SPARK-46746][SQL][AVRO] Attach codec extension to avro datasource files 9a2f39318e3a is described below commit 9a2f39318e3af8b3817dc5e4baf52e548d82063c Author: Kent Yao <y...@apache.org> AuthorDate: Wed Jan 17 07:20:32 2024 -0800 [SPARK-46746][SQL][AVRO] Attach codec extension to avro datasource files ### What changes were proposed in this pull request? This PR attaches codec extension to avro datasource files. ``` part-00000-2d4a2c78-a62a-4f7d-a286-5572dcdefade-c000.zstandard.avro part-00000-74c04de5-c991-4a40-8740-8d472f4ce2ec-c000.avro part-00000-965d0e93-9f86-40f9-8544-d71d14cc9787-c000.xz.avro part-00002-965d0e93-9f86-40f9-8544-d71d14cc9787-c000.snappy.avro ``` ### Why are the changes needed? Feature parity with parquet and orc file sources, which is useful to differentiate compression codecs of Avro files ### Does this PR introduce _any_ user-facing change? No, this more likely belong to underlying data storage layer ### How was this patch tested? new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44770 from yaooqinn/SPARK-46746. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/sql/avro/AvroOutputWriterFactory.scala | 10 +++++++++- .../scala/org/apache/spark/sql/avro/AvroCodecSuite.scala | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala index 3eba013c1435..15c76ec358ed 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.avro import org.apache.avro.Schema +import org.apache.avro.mapreduce.AvroJob import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} @@ -38,7 +39,14 @@ private[sql] class AvroOutputWriterFactory( private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) - override def getFileExtension(context: TaskAttemptContext): String = ".avro" + override def getFileExtension(context: TaskAttemptContext): String = { + val codec = context.getConfiguration.get(AvroJob.CONF_OUTPUT_CODEC) + if (codec == null || codec.equalsIgnoreCase("null")) { + ".avro" + } else { + s".$codec.avro" + } + } override def newInstance( path: String, diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala index 4e4942e1b2e2..ec3753b84a55 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala @@ -26,4 +26,20 @@ class AvroCodecSuite extends FileSourceCodecSuite { override val codecConfigName: String = SQLConf.AVRO_COMPRESSION_CODEC.key override protected def availableCodecs = AvroCompressionCodec.values().map(_.lowerCaseName()).iterator.to(Seq) + + availableCodecs.foreach { codec => + test(s"SPARK-46746: attach codec name to avro files - codec $codec") { + withTable("avro_t") { + sql( + s"""CREATE TABLE avro_t + | USING $format OPTIONS('compression'='$codec') + | AS SELECT 1 as id + | """.stripMargin) + spark.table("avro_t") + .inputFiles.foreach { file => + assert(file.endsWith(s"$codec.avro".stripPrefix("uncompressed"))) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org