[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21601


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-09 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r201055350
  
--- Diff: 
core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala 
---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.{DataOutputStream, File, FileOutputStream}
+
+import scala.collection.immutable.IndexedSeq
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Tests the correctness of
+ * [[org.apache.spark.input.WholeTextFileInputFormat 
WholeTextFileInputFormat]]. A temporary
+ * directory containing files is created as fake input which is deleted in 
the end.
+ */
+class WholeTextFileInputFormatSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
+  private var sc: SparkContext = _
+
+  override def beforeAll() {
+super.beforeAll()
+val conf = new SparkConf()
+sc = new SparkContext("local", "test", conf)
+
+
sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node",
 123456)
--- End diff --

would be nice to add comment here about 123456 value - ie it being larger 
than maxSplitSize 
Also can we move this down into the test itself


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-02 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r199597945
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
 val maxSplitSize = Math.ceil(totalLen * 1.0 /
   (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+// For small files we need to ensure the min split size per node & 
rack <= maxSplitSize
+val config = context.getConfiguration
+val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+if (maxSplitSize < minSplitSizePerNode) {
+  super.setMinSplitSizeNode(maxSplitSize)
--- End diff --

AFAIU If we set these to `0L` unconditionally, every time there is left 
over data which wasn't combined into a split, would result in its own split 
because minSplitSizePerNode is `0L`. 
This shouldn't be an issue for small no. of files. But if we have a large 
no. of small files which result in a similar situation, we will end up having 
more splits rather than combining these together to form lesser no. of splits.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-07-02 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r199602993
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
 val maxSplitSize = Math.ceil(totalLen * 1.0 /
   (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+// For small files we need to ensure the min split size per node & 
rack <= maxSplitSize
+val config = context.getConfiguration
+val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+if (maxSplitSize < minSplitSizePerNode) {
+  super.setMinSplitSizeNode(maxSplitSize)
--- End diff --

Also if a user specifies them via configs we are ensuring that these don't 
break the code. If we set them to `0L` where a user specifies them, we would 
end up breaking the code anyways as the way `CombineFileInputFormat` works is 
it checks to see if the setting is `0L` or not. If it is 0 it ends up picking 
the value from the config.  
https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182
 So we would have to atleast set the config to avoid hitting the error. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-06-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21601#discussion_r198667795
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
 val maxSplitSize = Math.ceil(totalLen * 1.0 /
   (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+// For small files we need to ensure the min split size per node & 
rack <= maxSplitSize
+val config = context.getConfiguration
+val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+if (maxSplitSize < minSplitSizePerNode) {
+  super.setMinSplitSizeNode(maxSplitSize)
--- End diff --

Is there a point in even checking the configuration? Why not just set these 
to `0L` unconditionally?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21601: [SPARK-24610] fix reading small files via wholeTe...

2018-06-20 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/21601

[SPARK-24610] fix reading small files via wholeTextFiles

## What changes were proposed in this pull request?
The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s 
being read using the `wholeTextFiles` method. While this works well for large 
files, for smaller files where the maxSplitSize is smaller than the defaults 
being used with configs like hive-site.xml or explicitly passed in the form of 
`mapreduce.input.fileinputformat.split.minsize.per.node` or 
`mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an 
exception.


```java
java.io.IOException: Minimum split size pernode 123456 cannot be larger 
than maximum split size 9962
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at 
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and 
minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark bug/SPARK-24610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21601.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21601


commit 2369e3acee730b7d4e45175870de0ecac601069b
Author: Dhruve Ashar 
Date:   2018-06-20T16:34:36Z

[SPARK-24610] fix reading small files via wholeTextFiles




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org