[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21601 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r201055350 --- Diff: core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import java.io.{DataOutputStream, File, FileOutputStream} + +import scala.collection.immutable.IndexedSeq + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Tests the correctness of + * [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary + * directory containing files is created as fake input which is deleted in the end. + */ +class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { + private var sc: SparkContext = _ + + override def beforeAll() { +super.beforeAll() +val conf = new SparkConf() +sc = new SparkContext("local", "test", conf) + + sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) --- End diff -- would be nice to add comment here about 123456 value - ie it being larger than maxSplitSize Also can we move this down into the test itself --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r199597945 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- AFAIU If we set these to `0L` unconditionally, every time there is left over data which wasn't combined into a split, would result in its own split because minSplitSizePerNode is `0L`. This shouldn't be an issue for small no. of files. But if we have a large no. of small files which result in a similar situation, we will end up having more splits rather than combining these together to form lesser no. of splits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r199602993 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- Also if a user specifies them via configs we are ensuring that these don't break the code. If we set them to `0L` where a user specifies them, we would end up breaking the code anyways as the way `CombineFileInputFormat` works is it checks to see if the setting is `0L` or not. If it is 0 it ends up picking the value from the config. https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182 So we would have to atleast set the config to avoid hitting the error. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21601#discussion_r198667795 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + +// For small files we need to ensure the min split size per node & rack <= maxSplitSize +val config = context.getConfiguration +val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) +val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + +if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) --- End diff -- Is there a point in even checking the configuration? Why not just set these to `0L` unconditionally? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...
GitHub user dhruve opened a pull request: https://github.com/apache/spark/pull/21601 [SPARK-24610] fix reading small files via wholeTextFiles ## What changes were proposed in this pull request? The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s being read using the `wholeTextFiles` method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of `mapreduce.input.fileinputformat.split.minsize.per.node` or `mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an exception. ```java java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) ... 48 elided ` This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack` ## How was this patch tested? Test manually setting the conf while launching the job and added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhruve/spark bug/SPARK-24610 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21601 commit 2369e3acee730b7d4e45175870de0ecac601069b Author: Dhruve Ashar Date: 2018-06-20T16:34:36Z [SPARK-24610] fix reading small files via wholeTextFiles --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org