Repository: spark Updated Branches: refs/heads/master e05a2feeb -> aec502d91
[SPARK-15654] [SQL] fix non-splitable files for text based file formats ## What changes were proposed in this pull request? Currently, we always split the files when it's bigger than maxSplitBytes, but Hadoop LineRecordReader does not respect the splits for compressed files correctly, we should have a API for FileFormat to check whether the file could be splitted or not. This PR is based on #13442, closes #13442 ## How was this patch tested? add regression tests. Author: Davies Liu <dav...@databricks.com> Closes #13531 from davies/fix_split. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aec502d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aec502d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aec502d9 Branch: refs/heads/master Commit: aec502d9114ad8e18bfbbd63f38780e076d326d1 Parents: e05a2fe Author: Davies Liu <dav...@databricks.com> Authored: Fri Jun 10 14:32:43 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Fri Jun 10 14:32:43 2016 -0700 ---------------------------------------------------------------------- .../spark/ml/source/libsvm/LibSVMRelation.scala | 2 +- .../datasources/FileSourceStrategy.scala | 17 ++++++--- .../datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/fileSourceInterfaces.scala | 33 +++++++++++++++-- .../datasources/json/JsonFileFormat.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 7 ++++ .../datasources/text/TextFileFormat.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 37 +++++++++++++++++++- .../execution/datasources/text/TextSuite.scala | 17 +++++++++ .../spark/sql/hive/orc/OrcFileFormat.scala | 7 ++++ .../spark/sql/sources/SimpleTextRelation.scala | 2 +- 11 files changed, 115 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 7629369..b5b2a68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -112,7 +112,7 @@ private[libsvm] class LibSVMOutputWriter( */ // If this is moved or renamed, please update DataSource's backwardCompatibilityMap. @Since("1.6.0") -class LibSVMFileFormat extends FileFormat with DataSourceRegister { +class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister { @Since("1.6.0") override def shortName(): String = "libsvm" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 7503285..13a86bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -151,11 +151,18 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset - val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts) + if (files.fileFormat.isSplitable(files.sparkSession, files.options, file.getPath)) { + (0L until file.getLen by maxSplitBytes).map { offset => + val remaining = file.getLen - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + val hosts = getBlockHosts(blockLocations, offset, size) + PartitionedFile( + partition.values, file.getPath.toUri.toString, offset, size, hosts) + } + } else { + val hosts = getBlockHosts(blockLocations, 0, file.getLen) + Seq(PartitionedFile( + partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 4d36b76..be52de8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration /** * Provides access to CSV data from pure SQL statements. */ -class CSVFileFormat extends FileFormat with DataSourceRegister { +class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "csv" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 7f3eed3..890e64d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -22,6 +22,7 @@ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -29,12 +30,12 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration /** @@ -215,6 +216,16 @@ trait FileFormat { } /** + * Returns whether a file with `path` could be splitted or not. + */ + def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + false + } + + /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * * @param dataSchema The global data schema. It can be either specified by the user, or @@ -298,6 +309,24 @@ trait FileFormat { } /** + * The base class file format that is based on text file. + */ +abstract class TextBasedFileFormat extends FileFormat { + private var codecFactory: CompressionCodecFactory = null + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + if (codecFactory == null) { + codecFactory = new CompressionCodecFactory( + sparkSession.sessionState.newHadoopConfWithOptions(options)) + } + val codec = codecFactory.getCodec(path) + codec == null || codec.isInstanceOf[SplittableCompressionCodec] + } +} + +/** * A collection of data files from a partitioned relation, along with the partition values in the * form of an [[InternalRow]]. */ http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index c7c5281..86aef1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -class JsonFileFormat extends FileFormat with DataSourceRegister { +class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "json" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ada9cd4..3735c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -261,6 +261,13 @@ private[sql] class ParquetFileFormat schema.forall(_.dataType.isInstanceOf[AtomicType]) } + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + true + } + override private[sql] def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 9c03ab2..abb6059 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration /** * A data source for reading text files. */ -class TextFileFormat extends FileFormat with DataSourceRegister { +class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "text" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 25f1443..67ff257 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -340,6 +340,41 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("SPARK-15654 do not split non-splittable files") { + // Check if a non-splittable file is not assigned into partitions + Seq("gz", "snappy", "lz4").map { suffix => + val table = createTable( + files = Seq(s"file1.${suffix}" -> 3, s"file2.${suffix}" -> 1, s"file3.${suffix}" -> 1) + ) + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> "2", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0") { + checkScan(table.select('c1)) { partitions => + assert(partitions.size == 2) + assert(partitions(0).files.size == 1) + assert(partitions(1).files.size == 2) + } + } + } + + // Check if a splittable compressed file is assigned into multiple partitions + Seq("bz2").map { suffix => + val table = createTable( + files = Seq(s"file1.${suffix}" -> 3, s"file2.${suffix}" -> 1, s"file3.${suffix}" -> 1) + ) + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> "2", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0") { + checkScan(table.select('c1)) { partitions => + assert(partitions.size == 3) + assert(partitions(0).files.size == 1) + assert(partitions(1).files.size == 2) + assert(partitions(2).files.size == 1) + } + } + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema = @@ -434,7 +469,7 @@ object LastArguments { } /** A test [[FileFormat]] that records the arguments passed to buildReader, and returns nothing. */ -class TestFileFormat extends FileFormat { +class TestFileFormat extends TextBasedFileFormat { override def toString: String = "TestFileFormat" http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 7b6981f..5695f6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -137,6 +138,22 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-15654: should not split gz files") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df1 = spark.range(0, 1000).selectExpr("CAST(id AS STRING) AS s") + df1.write.option("compression", "gzip").mode("overwrite").text(path) + + val expected = df1.collect() + Seq(10, 100, 1000).foreach { bytes => + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> bytes.toString) { + val df2 = spark.read.format("text").load(path) + checkAnswer(df2, expected) + } + } + } + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 0e8c37d..a2c8092 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -95,6 +95,13 @@ private[sql] class OrcFileFormat } } + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + true + } + override def buildReader( sparkSession: SparkSession, dataSchema: StructType, http://git-wip-us.apache.org/repos/asf/spark/blob/aec502d9/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 1fb777a..67a58a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration -class SimpleTextSource extends FileFormat with DataSourceRegister { +class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "test" override def inferSchema( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org