[GitHub] spark pull request #19464: Spark 22233

2017-10-10 Thread liutang123
GitHub user liutang123 opened a pull request:

https://github.com/apache/spark/pull/19464

Spark 22233

## What changes were proposed in this pull request?
add spark.hadoop.filterOutEmptySplit confituration to allow user to filter 
out empty split in HadoopRDD.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liutang123/spark SPARK-22233

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19464


commit 2317bfdf18fc1a7b21cd43e0ec12f5e957fb1895
Author: liutang123 
Date:   2017-06-21T04:27:42Z

Merge pull request #1 from apache/master

20170521 pull request

commit e3f993959fabdb80b966a42bf40d1cb5c6b44d95
Author: liulijia 
Date:   2017-09-28T06:12:04Z

Merge branch 'master' of https://github.com/apache/spark

commit 8f57d43b6bf127fc67e3e391d851efae3a859206
Author: liulijia 
Date:   2017-10-10T02:16:18Z

Merge branch 'master' of https://github.com/apache/spark

commit 3610f78837f4a5623f6d47b9feab1e565ed6
Author: liulijia 
Date:   2017-10-10T10:19:29Z

allow user to filter empty split in HadoopRDD




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r143984294
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---
@@ -122,7 +122,10 @@ class NewHadoopRDD[K, V](
   case _ =>
 }
 val jobContext = new JobContextImpl(_conf, jobId)
-val rawSplits = inputFormat.getSplits(jobContext).toArray
+var rawSplits = 
inputFormat.getSplits(jobContext).toArray(Array.empty[InputSplit])
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
+  rawSplits = rawSplits.filter(_.getLength>0)
--- End diff --

Space around operator.
You should filter before making an array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144031167
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,16 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.hadoop.filterOutEmptySplit") {
+val sf = new SparkConf()
+
sf.setAppName("test").setMaster("local").set("spark.hadoop.filterOutEmptySplit",
 "true")
+sc = new SparkContext(sf)
+val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
+emptyRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](tempDir.getPath + "/output")
+assert(new File(tempDir.getPath + "/output/part-0").exists() === 
true)
+
+val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-0")
+assert(hadoopRDD.partitions.length === 0)
--- End diff --

You should recycle the resources you required in the test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144030646
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
+  inputSplits = inputSplits.filter(_.getLength>0)
--- End diff --

nit: extra space around operator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144031728
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,16 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.hadoop.filterOutEmptySplit") {
+val sf = new SparkConf()
+
sf.setAppName("test").setMaster("local").set("spark.hadoop.filterOutEmptySplit",
 "true")
+sc = new SparkContext(sf)
+val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
+emptyRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](tempDir.getPath + "/output")
+assert(new File(tempDir.getPath + "/output/part-0").exists() === 
true)
+
+val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-0")
--- End diff --

We should also add the following test cases:
1. Ensure that if no split is empty, we don't lose any splits;
2. Ensure that if part of the splits are empty, we remove the splits 
correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144029852
  
--- Diff: docs/configuration.md ---
@@ -1211,6 +1211,14 @@ Apart from these, the following properties are also 
available, and may be useful
 data may need to be rewritten to pre-existing output directories 
during checkpoint recovery.
 
 
+spark.hadoop.filterOutEmptySplit
--- End diff --

We should add the config to `internal/config`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144030461
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
--- End diff --

How about:
```
val inputSplits = if (..) {
inputFormat.getSplits(jobConf, minPartitions).filter(_.getLength > 0)
} else {
   inputFormat.getSplits(jobConf, minPartitions)
}
```
We should alway try to not use `var`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144181321
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
--- End diff --

I would suggest not to use the name started by "spark.hadoop", this kind of 
configurations will be treated as Hadoop configuration and set into Hadoop 
`Configuration`, it might be better to choose another name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144197159
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,10 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+var inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
--- End diff --

I'd use `spark.files` prefix, taken after `spark.files.ignoreCorruptFiles`, 
`spark.files.maxPartitionBytes` and `spark.files.openCostInBytes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread liutang123
Github user liutang123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144235417
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---
@@ -122,7 +122,10 @@ class NewHadoopRDD[K, V](
   case _ =>
 }
 val jobContext = new JobContextImpl(_conf, jobId)
-val rawSplits = inputFormat.getSplits(jobContext).toArray
+var rawSplits = 
inputFormat.getSplits(jobContext).toArray(Array.empty[InputSplit])
+if 
(sparkContext.getConf.getBoolean("spark.hadoop.filterOutEmptySplit", false)) {
+  rawSplits = rawSplits.filter(_.getLength>0)
--- End diff --

Is there any one use empty file to do something ?
for example:
sc.textFile("/somepath/*").mapPartitions()
setting this flag to true by default may change the behavior of user's 
application.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread liutang123
Github user liutang123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144235787
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,16 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.hadoop.filterOutEmptySplit") {
+val sf = new SparkConf()
+
sf.setAppName("test").setMaster("local").set("spark.hadoop.filterOutEmptySplit",
 "true")
+sc = new SparkContext(sf)
+val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
+emptyRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](tempDir.getPath + "/output")
+assert(new File(tempDir.getPath + "/output/part-0").exists() === 
true)
+
+val hadoopRDD = sc.textFile(tempDir.getPath + "/output/part-0")
+assert(hadoopRDD.partitions.length === 0)
--- End diff --

The resources will be recycled by default in the afterEach function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144311831
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,65 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("allow user to filter out empty split (old Hadoop API)") {
+val sf = new SparkConf()
+sf.setAppName("test").setMaster("local").set(FILTER_OUT_EMPTY_SPLIT, 
true)
+sc = new SparkContext(sf)
+
+// Ensure that if all of the splits are empty, we remove the splits 
correctly
+val emptyRDD = sc.parallelize(Array.empty[Tuple2[String, String]], 1)
+emptyRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](tempDir.getPath + "/output")
--- End diff --

don't hardcode the path separator, use `new File(tempDir, output)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144320027
  
--- Diff: docs/configuration.md ---
@@ -1192,6 +1192,14 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
+spark.files.filterOutEmptySplit
--- End diff --

I don't think I'd document this. It should be just a safety valve flag


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144319668
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---
@@ -196,7 +196,11 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
+val inputSplits = if 
(sparkContext.getConf.get(FILTER_OUT_EMPTY_SPLIT)) {
--- End diff --

You can avoid duplicating  `inputFormat.getSplits(jobConf, minPartitions)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144319836
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,65 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("allow user to filter out empty split (old Hadoop API)") {
+val sf = new SparkConf()
--- End diff --

sf -> conf. You can fix it above too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144319503
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -270,6 +270,15 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private [spark] val FILTER_OUT_EMPTY_SPLIT = 
ConfigBuilder("spark.files.filterOutEmptySplit")
--- End diff --

Nit: no space after private
This doc is much too verbose for a flag. Just say, "If true, methods like 
that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not 
create a partition for input splits that are empty."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144324123
  
--- Diff: docs/configuration.md ---
@@ -1192,6 +1192,14 @@ Apart from these, the following properties are also 
available, and may be useful
   
 
 
+spark.files.filterOutEmptySplit
--- End diff --

yea we can make this conf an internal conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144330429
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -270,6 +270,15 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private [spark] val FILTER_OUT_EMPTY_SPLIT = 
ConfigBuilder("spark.files.filterOutEmptySplit")
--- End diff --

nit: how about `ignoreEmptySplits` to be matched with `ignoreCorruptFiles`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144353093
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,54 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, 
partitionLength: Int): Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === partitionLength)
+}
+
+// Ensure that if all of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(Array.empty[Tuple2[String, String]], 1, 0, 
"part-0", 0)
--- End diff --

I'd call it with named arguments, for example,

```scala
testIgnoreEmptySplits(
  Array.empty[Tuple2[String, String]],
  numSlices = 1,
  outputSuffix = 0,
  checkPart = "part-0",
  expectedPartitionNum = 0)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144357089
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+}
--- End diff --

Could we maybe do this something like ... as below? (not tested)

```scala
def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
actualPartitionNum: Int,
expectedName: String,
expectedPartitionNum: Int): Unit = {
  val output = new File(tempDir, "output")
  sc.parallelize(data, actualPartitionNum)
.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getAbsolutePath)
  assert(new File(output, expectedPart).exists())
  val hadoopRDD = sc.textFile(new File(output, "part-*").getAbsolutePath)
  assert(hadoopRDD.partitions.length === expectedPartitionNum)
}

...

testIgnoreEmptySplits(
  data = Array.empty[Tuple2[String, String]],
  actualPartitionNum = 1,
  expectedName = "part-0",
  expectedPartitionNum = 0)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144357376
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+}
+
+// Ensure that if all of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array.empty[Tuple2[String, String]],
+  numSlices = 1,
+  outputSuffix = 0,
+  checkPart = "part-0",
+  expectedPartitionNum = 0)
+
+// Ensure that if no split is empty, we don't lose any splits
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
+  numSlices = 2,
+  outputSuffix = 1,
+  checkPart = "part-1",
+  expectedPartitionNum = 2)
+
+// Ensure that if part of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a")),
+  numSlices = 5,
+  outputSuffix = 2,
+  checkPart = "part-4",
+  expectedPartitionNum = 2)
+  }
+
+  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-r-*").getPath)
--- End diff --

I think we should _read_ it with new hadoop API to test `NewHadoopRDD` I 
guess?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144365104
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+}
--- End diff --

Actually, to me the previous tests were also okay to me as well ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144452771
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -270,6 +270,12 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private[spark] val IGNORE_EMPTY_SPLITS = 
ConfigBuilder("spark.files.ignoreEmptySplits")
+.doc("If true, methods like that use HadoopRDD and NewHadoopRDD such 
as " +
--- End diff --

`like that` -> `that`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144472244
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,86 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
--- End diff --

nit: one argument per line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-12 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144472362
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,86 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
+  outputSuffix: Int, checkPart: String, expectedPartitionNum: Int): 
Unit = {
+  val dataRDD = sc.parallelize(data, numSlices)
+  val output = new File(tempDir, "output" + outputSuffix)
+  dataRDD.saveAsHadoopFile[TextOutputFormat[String, 
String]](output.getPath)
+  assert(new File(output, checkPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+}
+
+// Ensure that if all of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array.empty[Tuple2[String, String]],
+  numSlices = 1,
+  outputSuffix = 0,
+  checkPart = "part-0",
+  expectedPartitionNum = 0)
+
+// Ensure that if no split is empty, we don't lose any splits
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
+  numSlices = 2,
+  outputSuffix = 1,
+  checkPart = "part-1",
+  expectedPartitionNum = 2)
+
+// Ensure that if part of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a")),
+  numSlices = 5,
+  outputSuffix = 2,
+  checkPart = "part-4",
+  expectedPartitionNum = 2)
+  }
+
+  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(data: Array[Tuple2[String, String]], 
numSlices: Int,
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144604878
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+  data: Array[Tuple2[String, String]],
+  actualPartitionNum: Int,
+  expectedPart: String,
+  expectedPartitionNum: Int): Unit = {
+  val output = new File(tempDir, "output")
+  sc.parallelize(data, actualPartitionNum)
+.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+  assert(new File(output, expectedPart).exists() === true)
--- End diff --

I don't think we need the `expectedPart` parameter, just
```
for (i <- 0 until actualPartitionNum) {
  assert(new File(output, s"part-$i").exists() === true)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144617011
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+  data: Array[Tuple2[String, String]],
+  actualPartitionNum: Int,
+  expectedPart: String,
+  expectedPartitionNum: Int): Unit = {
+  val output = new File(tempDir, "output")
+  sc.parallelize(data, actualPartitionNum)
+.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+  assert(new File(output, expectedPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+  Utils.deleteRecursively(output)
--- End diff --

Maybe:

```scala
try {
  ...
} finally {
  Utils.deleteRecursively(output)
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144617310
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+  data: Array[Tuple2[String, String]],
+  actualPartitionNum: Int,
+  expectedPart: String,
+  expectedPartitionNum: Int): Unit = {
--- End diff --

Indentation..

```scala
def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
...
expectedPartitionNum: Int): Unit = {
  val output = new File(tempDir, "output")
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-13 Thread liutang123
Github user liutang123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144683771
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,87 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+  data: Array[Tuple2[String, String]],
+  actualPartitionNum: Int,
+  expectedPart: String,
+  expectedPartitionNum: Int): Unit = {
+  val output = new File(tempDir, "output")
+  sc.parallelize(data, actualPartitionNum)
+.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+  assert(new File(output, expectedPart).exists() === true)
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+  Utils.deleteRecursively(output)
--- End diff --

I think we don't need `try... finally` here. Because 
`Utils.deleteRecursively(output)` just to ensure
the success of next invocation of the `testIgnoreEmptySplits`. When test 
finished, wether be passed or not, the `tempDir` will be deleted in 
`FileSuite.afterEach()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144687101
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -510,4 +510,83 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 }
   }
 
+  test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+data: Array[Tuple2[String, String]],
+actualPartitionNum: Int,
+expectedPartitionNum: Int): Unit = {
+  val output = new File(tempDir, "output")
+  sc.parallelize(data, actualPartitionNum)
+.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
+  for (i <- 0 until actualPartitionNum) {
+assert(new File(output, s"part-$i").exists() === true)
+  }
+  val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
+  assert(hadoopRDD.partitions.length === expectedPartitionNum)
+  Utils.deleteRecursively(output)
+}
+
+// Ensure that if all of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array.empty[Tuple2[String, String]],
+  actualPartitionNum = 1,
+  expectedPartitionNum = 0)
+
+// Ensure that if no split is empty, we don't lose any splits
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
+  actualPartitionNum = 2,
+  expectedPartitionNum = 2)
+
+// Ensure that if part of the splits are empty, we remove the splits 
correctly
+testIgnoreEmptySplits(
+  data = Array(("key1", "a"), ("key2", "a")),
+  actualPartitionNum = 5,
+  expectedPartitionNum = 2)
+  }
+
+  test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
+val conf = new SparkConf()
+conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, 
true)
+sc = new SparkContext(conf)
+
+def testIgnoreEmptySplits(
+data: Array[Tuple2[String, String]],
+actualPartitionNum: Int,
+expectedPartitionNum: Int): Unit = {
+  val output = new File(tempDir, "output")
+  sc.parallelize(data, actualPartitionNum)
+.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, 
String]](output.getPath)
+  for (i <- 0 until actualPartitionNum) {
+assert(new File(output, s"part-r-$i").exists() === true)
+  }
+  val hadoopRDD = sc.newAPIHadoopFile(new File(output, 
"part-r-*").getPath,
+classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text])
+.asInstanceOf[NewHadoopRDD[_, _]]
--- End diff --

nit:

```scala
val hadoopRDD = sc.newAPIHadoopFile(
  new File(output, "part-r-*").getPath,
  classOf[NewTextInputFormat],
  classOf[LongWritable],
  classOf[Text]).asInstanceOf[NewHadoopRDD[_, _]]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19464


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-15 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144745022
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -270,6 +270,12 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private[spark] val IGNORE_EMPTY_SPLITS = 
ConfigBuilder("spark.files.ignoreEmptySplits")
--- End diff --

This config should be made internal, and the name should be improved 
because it's not about spark files. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: [SPARK-22233] [core] Allow user to filter out emp...

2017-10-15 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19464#discussion_r144745063
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -270,6 +270,12 @@ package object config {
 .longConf
 .createWithDefault(4 * 1024 * 1024)
 
+  private[spark] val IGNORE_EMPTY_SPLITS = 
ConfigBuilder("spark.files.ignoreEmptySplits")
--- End diff --

I'll send a follow-up PR to fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org