[GitHub] spark pull request #21648: [SPARK-24665][PySpark] Use SQLConf in PySpark to ...

2018-06-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21648#discussion_r199316817
  
--- Diff: python/pyspark/sql/context.py ---
@@ -93,6 +93,10 @@ def _ssql_ctx(self):
 """
 return self._jsqlContext
 
+def conf(self):
--- End diff --

Thanks, done in 4fc0ae4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184276403
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

Yep, you're right. The success completely event in UT was treated as normal 
success task. I fixed this by ignore this event at the beginning of 
handleTaskCompletion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184260597
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
--- End diff --

The success task will be ignored by 
`OutputCommitCoordinator.taskCompleted`, in the taskCompleted logic, 
stageStates.getOrElse will return because the current stage is in failed set. 
The detailed log providing below:
```
18/04/26 10:50:24.524 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Resubmitting ShuffleMapStage 0 (RDD at 
DAGSchedulerSuite.scala:74) and ResultStage 1 () due to fetch failure
18/04/26 10:50:24.535 ScalaTest-run-running-DAGSchedulerSuite DEBUG 
DAGSchedulerSuite$$anon$6: Increasing epoch to 2
18/04/26 10:50:24.538 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Executor lost: exec-hostA (epoch 1)
18/04/26 10:50:24.540 ScalaTest-run-running-DAGSchedulerSuite INFO 
DAGScheduler: Shuffle files lost for execut

[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184274946
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

Ah, it's used for check job successful complete and all temp structure 
empty.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...

2018-04-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20930#discussion_r184260210
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * This tests the case where origin task success after speculative task 
got FetchFailed
+   * before.
+   */
+  test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore 
following " +
+"successful tasks") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = 
mapOutputTracker)
+
+submit(rddC, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task success
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Check currently missing partition.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+// The second result task self success soon.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+// Missing partition number should not change, otherwise it will cause 
child stage
+// never succeed.
+
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size 
=== 1)
+  }
+
+  test("SPARK-23811: check ResultStage failed by FetchFailed can ignore 
following " +
+"successful tasks") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = 
mapOutputTracker)
+submit(rddB, Array(0, 1))
+
+// Complete both tasks in rddA.
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// The first task of rddB success
+assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// The second task's speculative attempt fails first, but task self 
still running.
+// This may caused by ExecutorLost.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, 
"ignored"),
+  null))
+// Make sure failedStage is not empty now
+assert(scheduler.failedStages.nonEmpty)
+// The second result task self success soon.
+assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+assertDataStructuresEmpty()
--- End diff --

I add this test for answering your previous question "Can you simulate what 
happens to result task if FechFaileded comes before task success?". This test 
can pass without my code changing in DAGScheduler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-29 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184882396
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -20,12 +20,12 @@ package org.apache.spark.io
 import java.nio.ByteBuffer
 
 import com.google.common.io.ByteStreams
--- End diff --

add an empty line behind 22 to separate spark and third-party group.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

2018-04-29 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21175#discussion_r184882338
  
--- Diff: 
core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
@@ -20,12 +20,12 @@ package org.apache.spark.io
 import java.nio.ByteBuffer
 
 import com.google.common.io.ByteStreams
-
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SharedSparkContext}
--- End diff --

move SharedSparkContext before SparkFunSuite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21194: [SPARK-24046][SS] Fix rate source when rowsPerSec...

2018-05-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21194#discussion_r185252544
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -101,25 +101,10 @@ object RateStreamProvider {
 
   /** Calculate the end value we will emit at the time `seconds`. */
   def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: 
Long): Long = {
-// E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10
-// Then speedDeltaPerSecond = 2
-//
-// seconds   = 0 1 2  3  4  5  6
-// speed = 0 2 4  6  8 10 10 (speedDeltaPerSecond * seconds)
-// end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * 
(seconds + 1) / 2
-val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
-if (seconds <= rampUpTimeSeconds) {
-  // Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 
2" in a special way to
-  // avoid overflow
-  if (seconds % 2 == 1) {
-(seconds + 1) / 2 * speedDeltaPerSecond * seconds
-  } else {
-seconds / 2 * speedDeltaPerSecond * (seconds + 1)
-  }
-} else {
-  // rampUpPart is just a special case of the above formula: 
rampUpTimeSeconds == seconds
-  val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, 
rampUpTimeSeconds)
-  rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond
-}
+val delta = rowsPerSecond.toDouble / rampUpTimeSeconds
+val rampUpSeconds = if (seconds <= rampUpTimeSeconds) seconds else 
rampUpTimeSeconds
+val afterRampUpSeconds = if (seconds > rampUpTimeSeconds ) seconds - 
rampUpTimeSeconds else 0
+// Use classic distance formula based on accelaration: ut + ½at2
+Math.floor(rampUpSeconds * rampUpSeconds * delta / 2).toLong + 
afterRampUpSeconds * rowsPerSecond
--- End diff --

nit: >100 characters


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21194: [SPARK-24046][SS] Fix rate source when rowsPerSec...

2018-05-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21194#discussion_r185252360
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -101,25 +101,10 @@ object RateStreamProvider {
 
   /** Calculate the end value we will emit at the time `seconds`. */
   def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: 
Long): Long = {
-// E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10
-// Then speedDeltaPerSecond = 2
-//
-// seconds   = 0 1 2  3  4  5  6
-// speed = 0 2 4  6  8 10 10 (speedDeltaPerSecond * seconds)
-// end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * 
(seconds + 1) / 2
-val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
-if (seconds <= rampUpTimeSeconds) {
-  // Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 
2" in a special way to
-  // avoid overflow
-  if (seconds % 2 == 1) {
-(seconds + 1) / 2 * speedDeltaPerSecond * seconds
-  } else {
-seconds / 2 * speedDeltaPerSecond * (seconds + 1)
-  }
-} else {
-  // rampUpPart is just a special case of the above formula: 
rampUpTimeSeconds == seconds
-  val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, 
rampUpTimeSeconds)
-  rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond
-}
+val delta = rowsPerSecond.toDouble / rampUpTimeSeconds
+val rampUpSeconds = if (seconds <= rampUpTimeSeconds) seconds else 
rampUpTimeSeconds
+val afterRampUpSeconds = if (seconds > rampUpTimeSeconds ) seconds - 
rampUpTimeSeconds else 0
+// Use classic distance formula based on accelaration: ut + ½at2
--- End diff --

nit: acceleration


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21188: [SPARK-24046][SS] Fix rate source rowsPerSecond <= rampU...

2018-05-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21188
  
@maasg as comment in #21194, I just consider we should not change the 
behavior while `seconds > rampUpTimeSeconds`. Maybe it more important than 
smooth.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21194: [SPARK-24046][SS] Fix rate source when rowsPerSec...

2018-05-03 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21194#discussion_r185851172
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 ---
@@ -173,55 +173,154 @@ class RateSourceSuite extends StreamTest {
 assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
   }
 
-  test("valueAtSecond") {
+  test("valueAtSecond without ramp-up") {
 import RateStreamProvider._
+val rowsPerSec = Seq(1,10,50,100,1000,1)
+val secs = Seq(1, 10, 100, 1000, 1, 10)
+for {
+  sec <- secs
+  rps <- rowsPerSec
+} yield {
+  assert(valueAtSecond(seconds = sec, rowsPerSecond = rps, 
rampUpTimeSeconds = 0) === sec * rps)
+}
+  }
 
-assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds 
= 0) === 0)
-assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds 
= 0) === 5)
+  test("valueAtSecond with ramp-up") {
+import RateStreamProvider._
+val rowsPerSec = Seq(1, 5, 10, 50, 100, 1000, 1)
+val rampUpSec = Seq(10, 100, 1000)
+
+// for any combination, value at zero = 0
+for {
+  rps <- rowsPerSec
+  rampUp <- rampUpSec
+} yield {
+  assert(valueAtSecond(seconds = 0, rowsPerSecond = rps, 
rampUpTimeSeconds = rampUp) === 0)
+}
 
-assert(valueAtSecond(seconds = 0, rowsPerSecond = 5, rampUpTimeSeconds 
= 2) === 0)
-assert(valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds 
= 2) === 1)
-assert(valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds 
= 2) === 3)
-assert(valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds 
= 2) === 8)
--- End diff --

I try your implement local and it changes the original behavior
```
valueAtSecond(seconds = 1, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 1
valueAtSecond(seconds = 2, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 5
valueAtSecond(seconds = 3, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 10
valueAtSecond(seconds = 4, rowsPerSecond = 5, rampUpTimeSeconds = 2) = 15
```
I think the bug fix should not change the value on `seconds > 
rampUpTimeSeconds`, just my opinion, you can ping other committers to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21188: [SPARK-24046][SS] Fix rate source rowsPerSecond <...

2018-05-03 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21188#discussion_r185852663
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
 ---
@@ -107,14 +107,25 @@ object RateStreamProvider {
 // seconds   = 0 1 2  3  4  5  6
 // speed = 0 2 4  6  8 10 10 (speedDeltaPerSecond * seconds)
 // end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * 
(seconds + 1) / 2
-val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
+val speedDeltaPerSecond = math.max(1, rowsPerSecond / 
(rampUpTimeSeconds + 1))
--- End diff --

Keep at-least 1 per second and leave other seconds to 0 is ok IMOP. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20150: [SPARK-22956][SS] Bug fix for 2 streams union failover s...

2018-01-04 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20150
  
cc @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20150: [SPARK-22956][SS] Bug fix for 2 streams union failover s...

2018-01-08 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20150
  
cc @gatorsmile @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20150: [SPARK-22956][SS] Bug fix for 2 streams union failover s...

2018-01-15 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20150
  
Thanks for your review! Shixiong


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17702: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-01-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/17702#discussion_r163156332
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -668,4 +672,31 @@ object DataSource extends Logging {
 }
 globPath
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * Follow [[InMemoryFileIndex]].bulkListLeafFile and reuse the conf.
+   */
+  private def getGlobbedPaths(
+  sparkSession: SparkSession,
+  fs: FileSystem,
+  hadoopConf: SerializableConfiguration,
+  qualified: Path): Seq[Path] = {
+val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified)
+if (paths.size <= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+  SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+} else {
+  val parallelPartitionDiscoveryParallelism =
+
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
+  val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
+  val expanded = sparkSession.sparkContext
--- End diff --

Sorry for the late reply, finished in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20150: [SPARK-22956][SS] Bug fix for 2 streams union fai...

2018-01-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20150#discussion_r161426641
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -122,6 +122,11 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   batches.slice(sliceStart, sliceEnd)
 }
 
+if (newBlocks.isEmpty) {
--- End diff --

DONE


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20150: [SPARK-22956][SS] Bug fix for 2 streams union fai...

2018-01-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20150#discussion_r161426622
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -318,6 +318,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
+  test("union bug in failover") {
+def getSpecificDF(range: Range.Inclusive): 
org.apache.spark.sql.Dataset[Int] = {
+  val topic = newTopic()
+  testUtils.createTopic(topic, partitions = 1)
+  testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
+
+  val reader = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+.option("kafka.metadata.max.age.ms", "1")
+.option("maxOffsetsPerTrigger", 5)
+.option("subscribe", topic)
+.option("startingOffsets", "earliest")
+
+  reader.load()
+.selectExpr("CAST(value AS STRING)")
+.as[String]
+.map(k => k.toInt)
+}
+
+val df1 = getSpecificDF(0 to 9)
+val df2 = getSpecificDF(100 to 199)
+
+val kafka = df1.union(df2)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+testStream(kafka)(
+  StartStream(ProcessingTime(100), clock),
+  waitUntilBatchProcessed,
+  // 5 from smaller topic, 5 from bigger one
+  CheckAnswer(0, 1, 2, 3, 4, 100, 101, 102, 103, 104),
--- End diff --

Cool, this made the code more cleaner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20150: [SPARK-22956][SS] Bug fix for 2 streams union fai...

2018-01-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20150#discussion_r161426632
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -318,6 +318,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
 )
   }
 
+  test("union bug in failover") {
--- End diff --

DONE


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17702: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-01-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/17702
  
ping @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20150: [SPARK-22956][SS] Bug fix for 2 streams union failover s...

2018-01-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20150
  
Hi Shixiong, thanks a lot for your reply.
The full stack below can reproduce by running the added UT based on 
original code base.
```
Assert on query failed: : Query [id = 3421db21-652e-47af-9d54-2b74a222abed, 
runId = cd8d7c94-1286-44a5-b000-a8d870aef6fa] terminated with exception: 
Partition topic-0-0's offset was changed from 10 to 5, some data may have been 
missed. 
Some data may have been lost because they are not available in Kafka any 
more; either the
 data was aged out by Kafka or the topic may have been deleted before all 
the data in the
 topic was processed. If you don't want your streaming query to fail on 
such cases, set the
 source option "failOnDataLoss" to "false".


org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)

org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

Caused by:  Partition topic-0-0's offset was changed from 10 to 5, 
some data may have been missed. 
Some data may have been lost because they are not available in Kafka any 
more; either the
 data was aged out by Kafka or the topic may have been deleted before all 
the data in the
 topic was processed. If you don't want your streaming query to fail on 
such cases, set the
 source option "failOnDataLoss" to "false".


org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:332)

org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:291)

org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:289)

scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)

scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)

scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)

scala.collection.AbstractTraversable.filter(Traversable.scala:104)

org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:289)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20244
  
reopen this...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161141499
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -96,6 +98,22 @@ class MyRDD(
   override def toString: String = "DAGSchedulerSuiteRDD " + id
 }
 
+/** Wrapped rdd partition. */
+class WrappedPartition(val partition: Partition) extends Partition {
+  def index: Int = partition.index
+}
+
+/** Wrapped rdd with WrappedPartition. */
+class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+  protected def getPartitions: Array[Partition] = {
+parent.partitions.map(p => new WrappedPartition(p))
+  }
+
+  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
--- End diff --

I think this line is the key point for `WrppedPartition` and `WrappedRDD`, 
please give comments for explaining your intention.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161144809
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+val latch = new CountDownLatch(2)
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished.
+semaphore1.release()
+semaphore2.acquire()
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+
+latch.countDown()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+val ser = SparkEnv.get.closureSerializer.newInstance()
+semaphore1.acquire()
+// Simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+semaphore2.release()
+semaphore1.acquire()
+// Part calculated with rdd checkpoint already finished.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+val part = rdd.partitions(0)
+intercept[ClassCastException] {
--- End diff --

I think this not a "test", this just a "reproduce" for the problem you want 
to fix. We should prove your code added in `DAGScheduler.scala` can fix that 
problem and with the original code base, a `ClassCastException` raised.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20244
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161141879
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
--- End diff --

maybe "SPARK-23053: avoid CastException in concurrent execution with 
checkpoint" better?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20244
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20244
  
@ivoson Tengfei, please post the full stack trace of the 
`ClassCastException`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

2018-02-26 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20675
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-25 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/20675

[SPARK-23033][SS][Follow Up] Task level retry for continuous processing

## What changes were proposed in this pull request?

Here we want to reimplement the task level retry for continuous processing, 
changes include:
1. Add a new `EpochCoordinatorMessage` named `GetLastEpochAndOffset`, it is 
used for getting last epoch and offset of particular partition while task 
restarted.
2. Add function setOffset for `ContinuousDataReader`, it supported 
BaseReader can restart from given offset.

## How was this patch tested?

Add new UT in `ContinuousSuite` and new `StreamAction` named 
`CheckAnswerRowsContainsOnlyOnce` for more accurate result checking.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-23033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20675


commit 21f574e2a3ad3c8e68b92776d2a141d7fcb90502
Author: Yuanjian Li <xyliyuanjian@...>
Date:   2018-02-26T07:27:10Z

[SPARK-23033][SS][Follow Up] Task level retry for continuous processing




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

2018-02-25 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20675
  
cc @tdas and @jose-torres 
#20225 gives a quickly fix for task level retry, this is just an attempt 
for a maybe better implementation. Please let me know if I do something wrong 
or have misunderstandings of Continuous Processing. Thanks :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20150: [SPARK-22956][SS] Bug fix for 2 streams union fai...

2018-01-04 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/20150

[SPARK-22956][SS] Bug fix for 2 streams union failover scenario

## What changes were proposed in this pull request?

This problem reported by @yanlin-Lynn @ivoson and @LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have 
no continues data coming and in the same time task restart, this will cause an 
`IllegalStateException`. This mainly cause because the code in 
[MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190)
 , while one stream has no continues data, its comittedOffset same with 
availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` 
not properly handled in KafkaSource. Also, maybe we should also consider this 
scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-22956

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20150.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20150


commit aa3d7b73ed5221bdc2aee9dea1f6db45b4a626d7
Author: Yuanjian Li <xyliyuanjian@...>
Date:   2018-01-04T11:52:23Z

SPARK-22956: Bug fix for 2 streams union failover scenario




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r170830121
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
 spark.sparkContext.addSparkListener(listener)
 try {
   testStream(df, useV2Sink = true)(
-StartStream(Trigger.Continuous(100)),
+StartStream(longContinuousTrigger),
+AwaitEpoch(0),
 Execute(waitForRateSourceTriggers(_, 2)),
+IncrementEpoch(),
 Execute { _ =>
   // Wait until a task is started, then kill its first attempt.
   eventually(timeout(streamingTimeout)) {
 assert(taskId != -1)
   }
   spark.sparkContext.killTaskAttempt(taskId)
 },
-ExpectFailure[SparkException] { e =>
-  e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
-})
+Execute(waitForRateSourceTriggers(_, 4)),
+IncrementEpoch(),
+// Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
+// will also return true.
+CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --

Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the 
test case failure probably because the CP maybe not stop between Range(0, 20) 
every time. See the logs below:
```
== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Analyzed Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Optimized Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Physical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- *(1) Project [value#13L]
   +- *(1) DataSourceV2Scan [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
 
 
ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class 
at (StreamTest.scala:436)
org.scalatest.exceptions.TestFailedException: 

== Results ==
!== Correct Answer - 20 ==   == Spark Answer - 25 ==
!struct   struct
 [0] [0]
 [10][10]
 [11][11]
 [12][12]
 [13][13]
 [14][14]
 [15][15]
 [16][16]
 [17][17]
 [18][18]
 [19][19]
 [1] [1]
![2] [20]
![3] [21]
![4] [22]
![5] [23]
![6] [24]
![7] [2]
![8] [3]
![9] [4]
![5]
![6]
![7]
![8]
![9]


== Progress ==
   
StartStream(ContinuousTrigger(360),org.apache.spark.util.SystemClock@343e225a,Map(),null)
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
=> CheckAnswer: 
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
   StopStream
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

2018-02-26 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/20675
  
Great thanks for your detailed reply!
> The semantics aren't quite right. Task-level retry can happen a fixed 
number of times for the lifetime of the task, which is the lifetime of the 
query - even if it runs for days after, the attempt number will never be reset.
- I think the attempt number never be reset is not a problem, as long as 
the task start with right epoch and offset. Maybe I don't understand the 
meaning of the semantics, could you please give more explain?
- As far as I'm concerned, while we have a larger parallel number, whole 
stage restart is a too heavy operation and will lead a data shaking.
- Also want to leave a further thinking, after CP support shuffle and more 
complex scenario, task level retry need more work to do in order to ensure data 
is correct. But it maybe still a useful feature? I just want to leave this 
patch and initiate a discussion about this :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...

2018-08-01 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/21945

[SPARK-24989][Core] Add retrying support for OutOfDirectMemoryError

## What changes were proposed in this pull request?

As the detailed description in 
[SPARK-24989](https://issues.apache.org/jira/browse/SPARK-24989), add retrying 
support in RetryingBlockFetcher while get io.netty.maxDirectMemory. The failed 
stages detail attached below:

![image](https://user-images.githubusercontent.com/4833765/43534362-c3a934a4-95e9-11e8-9ec1-5f868e04bc07.png)


## How was this patch tested?

Add UT in RetryingBlockFetcherSuite.java and test in the job above 
mentioned.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24989

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21945


commit bb6841b3a7a160e252fe35dab82f4ddeb0032591
Author: Yuanjian Li 
Date:   2018-08-01T16:15:09Z

[SPARK-24989][Core] Add retry support for OutOfDirectMemoryError




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...

2018-08-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21945
  
Close this, the param `spark.reducer.maxBlocksInFlightPerAddress` added 
after version 2.2 can solve my problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...

2018-08-03 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/21945


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188190
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206182473
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188295
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206190334
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
--- End diff --

Do not have to start a new line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206184350
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188012
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206184183
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188650
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVRO
+   

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206178526
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
 ---
@@ -96,6 +96,9 @@ object ParserUtils {
 }
   }
 
+  def extraMethod(s: String): String = {
--- End diff --

what's this used for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21881: [SPARK-24930][SQL] Improve exception information ...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21881#discussion_r206203835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -337,7 +337,11 @@ case class LoadDataCommand(
   new File(file.getAbsolutePath).exists()
 }
 if (!exists) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
+  // If user have no permission to access the given input path, 
`File.exists()` return false
+  // , `LOAD DATA input path does not exist` can confuse users.
+  throw new AnalysisException(s"LOAD DATA input path does not 
exist: `$path` or current " +
--- End diff --

Nit: no need to print the $path twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945617
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  /**
+   * Create an [[AlterTableFormatPropertiesCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+   * }}}
+   */
+  override def visitSetTableFormat(ctx: SetTableFormatContext): 
LogicalPlan = withOrigin(ctx) {
+val format = (ctx.fileFormat) match {
+  // Expected format: INPUTFORMAT input_format OUTPUTFORMAT 
output_format
+  case (c: TableFileFormatContext) =>
+visitTableFileFormat(c)
+  // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET 
| AVRO
+  case (c: GenericFileFormatContext) =>
+visitGenericFileFormat(c)
+  case _ =>
+throw new ParseException("Expected STORED AS ", ctx)
+}
+AlterTableFormatCommand(
+  visitTableIdentifier(ctx.tableIdentifier),
+  format,
+  // TODO a partition spec is allowed to have optional values. This is 
currently violated.
+  Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
--- End diff --

Confused by this todo, as currently implementation, while partition spec is 
empty, we change table's catalog?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945564
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+  private case class PartitionDefinition(
+  column: String,
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -415,6 +415,51 @@ case class AlterTableSerDePropertiesCommand(
 
 }
 
+/**
+ * A command that sets the format of a table/view/partition .
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+ * }}}
+ */
+case class AlterTableFormatCommand(
+tableName: TableIdentifier,
--- End diff --

indent nit: 4 space with func param.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  /**
+   * Create an [[AlterTableFormatPropertiesCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+   * }}}
+   */
+  override def visitSetTableFormat(ctx: SetTableFormatContext): 
LogicalPlan = withOrigin(ctx) {
+val format = (ctx.fileFormat) match {
+  // Expected format: INPUTFORMAT input_format OUTPUTFORMAT 
output_format
+  case (c: TableFileFormatContext) =>
+visitTableFileFormat(c)
+  // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET 
| AVRO
+  case (c: GenericFileFormatContext) =>
+visitGenericFileFormat(c)
+  case _ =>
+throw new ParseException("Expected STORED AS ", ctx)
--- End diff --

I think we need a more detailed ParseException message here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support

2018-08-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21985#discussion_r207712639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 ---
@@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: 
Expression, idx: Expressio
 })
   }
 }
+
+/**
+ * Extract all specific(idx) groups identified by a Java regex.
+ *
+ * NOTE: this expression is not THREAD-SAFE, as it has some internal 
mutable status.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches 
`regexp`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1);
+   [100, 300]
+  """)
+case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: 
Expression)
--- End diff --

Add an abstract class to reduce duplicated code between `RegExpExtractAll` 
and `RegExpExtract`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support

2018-08-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21985#discussion_r207712323
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 ---
@@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: 
Expression, idx: Expressio
 })
   }
 }
+
+/**
+ * Extract all specific(idx) groups identified by a Java regex.
+ *
+ * NOTE: this expression is not THREAD-SAFE, as it has some internal 
mutable status.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches 
`regexp`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1);
+   [100, 300]
+  """)
+case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: 
Expression)
+  extends TernaryExpression with ImplicitCastInputTypes {
+  def this(s: Expression, r: Expression) = this(s, r, Literal(1))
+
+  // last regex in string, we will update the pattern iff regexp value 
changed.
+  @transient private var lastRegex: UTF8String = _
+  // last regex pattern, we cache it for performance concern
+  @transient private var pattern: Pattern = _
+
+  override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
+if (!p.equals(lastRegex)) {
+  // regex value changed
+  lastRegex = p.asInstanceOf[UTF8String].clone()
+  pattern = Pattern.compile(lastRegex.toString)
+}
+val m = pattern.matcher(s.toString)
+var groupArrayBuffer = new ArrayBuffer[UTF8String]();
+
+while (m.find) {
+  val mr: MatchResult = m.toMatchResult
+  val group = mr.group(r.asInstanceOf[Int])
+  if (group == null) { // Pattern matched, but not optional group
+groupArrayBuffer += UTF8String.EMPTY_UTF8
+  } else {
+groupArrayBuffer += UTF8String.fromString(group)
+  }
+}
+
+new GenericArrayData(groupArrayBuffer.toArray.asInstanceOf[Array[Any]])
+  }
+
+  override def dataType: DataType = ArrayType(StringType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringType, 
StringType, IntegerType)
+  override def children: Seq[Expression] = subject :: regexp :: idx :: Nil
+  override def prettyName: String = "regexp_extract_all"
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val classNamePattern = classOf[Pattern].getCanonicalName
+val matcher = ctx.freshName("matcher")
+val matchResult = ctx.freshName("matchResult")
+val groupArray = ctx.freshName("groupArray")
+
+val termLastRegex = ctx.addMutableState("UTF8String", "lastRegex")
+val termPattern = ctx.addMutableState(classNamePattern, "pattern")
+
+val arrayClass = classOf[GenericArrayData].getName
+
+nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => {
+  s"""
+  if (!$regexp.equals($termLastRegex)) {
+// regex value changed
+$termLastRegex = $regexp.clone();
+$termPattern = 
$classNamePattern.compile($termLastRegex.toString());
+  }
+  java.util.regex.Matcher $matcher =
+$termPattern.matcher($subject.toString());
+  java.util.ArrayList $groupArray =
+new java.util.ArrayList();
+
+  while ($matcher.find()) {
+java.util.regex.MatchResult $matchResult = 
$matcher.toMatchResult();
+if ($matchResult.group($idx) == null) {
+  $groupArray.add(UTF8String.EMPTY_UTF8);
+} else {
+  $groupArray.add(UTF8String.fromString($matchResult.group($idx)));
+}
+  }
+  ${ev.value} = new $arrayClass($groupArray.toArray(new 
UTF8String[$groupArray.size()]));
--- End diff --

Do we need consider about setting ev.isNull?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...

2018-08-01 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21945
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r205478496
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+

[GitHub] spark pull request #22057: [SPARK-25077][SQL] Delete unused variable in Wind...

2018-08-09 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22057

[SPARK-25077][SQL] Delete unused variable in WindowExec

## What changes were proposed in this pull request?

Just delete the unused variable `inputFields` in WindowExec, avoid making 
others confused while reading the code.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25077

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22057


commit 90513587ed1d48437818e90f58612c344009f563
Author: liyuanjian 
Date:   2018-08-09T15:57:29Z

[SPARK-25077][SQL] Delete unused variable in WindowExec




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...

2018-08-15 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22093
  
`Should I delete current UT from FileSuit?`
I think current UT in `FileSuite` is unnecessarily, you can leave it and 
wait for other reviewer's opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22117: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22117
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in P...

2018-08-16 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22122

[SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark to manage all sql 
configs

## What changes were proposed in this pull request?

Follow up for SPARK-24665, find some others hard code during code review.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24665-follow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22122


commit 8a32e60a7af4f574176366eb057b219cb4511bb6
Author: Yuanjian Li 
Date:   2018-07-02T07:04:40Z

Use SQLConf in session.py and catalog.py




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22122
  
```
Are they all instances to fix?
```
@HyukjinKwon Yep, I grep all `conf.get("spark.sql.xxx")` and make sure for 
this. The remaining of hard code config is StaticSQLConf 
`spark.sql.catalogImplementation` in session.py, it can't manage by SQLConf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22122
  
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...

2018-08-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22105#discussion_r210842394
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 ---
@@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, 
WritableByteChannel target) throws IOExcept
 // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
 // for the case that the passed-in buffer has too many components.
 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
--- End diff --

```
IIRC socket buffers are 32k by default on Linux, so it seems unlikely you'd 
be able to write 256k in one call (ignoring what IOUtil does internally). But 
maybe in practice it works ok.
```
After reading the context in #12083 and this discussion, I want to provide 
a possibility about 256k in one call can work in practice. As in our scenario, 
user will change `/proc/sys/net/core/wmem_default` based on their online 
behavior, generally we'll set this value larger than `wmem_default`.

![image](https://user-images.githubusercontent.com/4833765/44256457-cebc0980-a23b-11e8-9b70-c7ad66fcfe1c.png)
So maybe 256k of NIO_BUFFER_LIMIT is ok here? We just need add more 
annotation to remind what params related with this value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function

2018-08-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21986#discussion_r207924294
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -205,29 +230,82 @@ case class ArrayTransform(
 (elementVar, indexVar)
   }
 
-  override def eval(input: InternalRow): Any = {
-val arr = this.input.eval(input).asInstanceOf[ArrayData]
-if (arr == null) {
-  null
-} else {
-  val f = functionForEval
-  val result = new GenericArrayData(new Array[Any](arr.numElements))
-  var i = 0
-  while (i < arr.numElements) {
-elementVar.value.set(arr.get(i, elementVar.dataType))
-if (indexVar.isDefined) {
-  indexVar.get.value.set(i)
-}
-result.update(i, f.eval(input))
-i += 1
+  override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = 
{
+val arr = inputValue.asInstanceOf[ArrayData]
+val f = functionForEval
+val result = new GenericArrayData(new Array[Any](arr.numElements))
+var i = 0
+while (i < arr.numElements) {
+  elementVar.value.set(arr.get(i, elementVar.dataType))
+  if (indexVar.isDefined) {
+indexVar.get.value.set(i)
   }
-  result
+  result.update(i, f.eval(inputRow))
+  i += 1
 }
+result
   }
 
   override def prettyName: String = "transform"
 }
 
+/**
+ * Filters entries in a map using the provided function.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Filters entries in a map using the 
function.",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
+   [1 -> 0, 3 -> -1]
+  """,
+since = "2.4.0")
+case class MapFilter(
+input: Expression,
+function: Expression)
+  extends MapBasedUnaryHigherOrderFunction with CodegenFallback {
+
+  @transient val (keyType, valueType, valueContainsNull) = input.dataType 
match {
--- End diff --

Maybe this should be a function in object MapBasedUnaryHigherOrderFunction, 
we can use it in other map based higher order function just like using 
ArrayBasedHigherOrderFunction.elementArgumentType.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208260664
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

`left.nullable && right.nullable`? Because if one side is empty map, NULL 
will be passed as the value for each key in other side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208257687
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
 ---
@@ -267,22 +267,23 @@ case class GetArrayItem(child: Expression, ordinal: 
Expression)
   }
 }
 
-/**
- * Common base class for [[GetMapValue]] and [[ElementAt]].
- */
-
-abstract class GetMapValueUtil extends BinaryExpression with 
ImplicitCastInputTypes {
+object GetMapValueUtil
+{
--- End diff --

nit: brace should in previous line. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22093: [SPARK-25100][CORE] Fix no registering TaskCommit...

2018-08-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22093#discussion_r209650955
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -424,6 +425,39 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
 assert(new File(tempDir.getPath + 
"/outputDataset_new/part-r-0").exists() === true)
   }
+  
+  test("SPARK-25100: Using KryoSerializer and" +
+  " setting registrationRequired true can lead job failed") {
+val tempDir = Utils.createTempDir()
+val inputDir = tempDir.getAbsolutePath + "/input"
+val outputDir = tempDir.getAbsolutePath + "/tmp"
+
+val writer = new PrintWriter(new File(inputDir))
+
+for(i <- 1 to 100) {
+  writer.print(i)
+  writer.write('\n')
+}
+
+writer.close()
+
+val conf = new SparkConf(false).setMaster("local").
+set("spark.kryo.registrationRequired", "true").setAppName("test")
+conf.set("spark.serializer", classOf[KryoSerializer].getName)
+conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
--- End diff --

Why we need set 'spark.serializer' twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...

2018-08-14 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22093
  
Why we should create own SparkContext here? Could we just add a UT like 
`registration of HighlyCompressedMapStatus` to check `TaskCommitMessage` 
working?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-08-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
cc @HyukjinKwon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
cc @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-08-20 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22165

[SPARK-25017][Core] Add test suite for BarrierCoordinator and 
ContextBarrierState

## What changes were proposed in this pull request?

Currently `ContextBarrierState` and `BarrierCoordinator` are only covered 
by end-to-end test in `BarrierTaskContextSuite`, add BarrierCoordinatorSuite to 
test both classes.

## How was this patch tested?

UT in BarrierCoordinatorSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22165.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22165


commit 21bd1c37f4af6480adfc07130a15f70acdeda378
Author: liyuanjian 
Date:   2018-08-21T05:24:07Z

[SPARK-25017][Core] Add test suite for BarrierCoordinator and 
ContextBarrierState




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
cc @gatorsmile @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22202: [SPARK-25211][Core] speculation and fetch failed ...

2018-08-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22202#discussion_r212365264
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2246,58 +2247,6 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assertDataStructuresEmpty()
   }
 
-  test("Trigger mapstage's job listener in submitMissingTasks") {
--- End diff --

Could you give some explain for deleting this test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-24 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/2

[SPARK-25083][SQL] Remove the type erasure hack in data source scan

## What changes were proposed in this pull request?

1. Add function `inputBatchRDDs` and `inputRowRDDs` interface in 
`ColumnarBatchScan`.
2.rewrite them in physical node which extends `ColumnarBatchScan`.

## How was this patch tested?

Refactor work, test with existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25083

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2


commit 992a08b1d77d59daeac95c67d07e5b8efe20ce20
Author: Yuanjian Li 
Date:   2018-08-24T15:54:27Z

[SPARK-25083][SQL] Remove the type erasure hack in data source scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r212784374
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -40,6 +42,29 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
 "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
 "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
+  /**
+   * Returns all the RDDs of ColumnarBatch which generates the input rows.
+   */
+  def inputBatchRDDs(): Seq[RDD[ColumnarBatch]]
+
+  /**
+   * Returns all the RDDs of InternalRow which generates the input rows.
+   */
+  def inputRowRDDs(): Seq[RDD[InternalRow]]
+
+  /**
+   * Get input RDD depends on supportsBatch.
+   */
+  final def getInputRDDs(): Seq[RDD[InternalRow]] = {
+if (supportsBatch) {
+  inputBatchRDDs().asInstanceOf[Seq[RDD[InternalRow]]]
--- End diff --

Here maybe the last explicitly erasure hack left, please check whether is 
it acceptable or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
cc @cloud-fan and @rdblue have a look when you have time. If this PR 
doesn't coincide with your expect, I'll close this soon. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212822215
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
+val (table1Name, table2Name) = ("t1", "t2")
+withTempDatabase { dbName =>
+  withTable(table1Name, table2Name) {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+  spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+  spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+  // First, makes sure a join is not broadcastable
+  val plan = sql(s"SELECT * FROM $dbName.$table1Name, 
$dbName.$table2Name " +
+  s"WHERE $table1Name.id = $table2Name.id")
+.queryExecution.executedPlan
+  assert(plan.collect { case p: BroadcastHashJoinExec => p }.size 
== 0)
+
+  // Uses multi-part table names for broadcast hints
+  def checkIfHintApplied(tableName: String, hintTableName: 
String): Unit = {
--- End diff --

`hintTableName` is never used in this func?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...

2018-08-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22252#discussion_r213343952
  
--- Diff: docs/configuration.md ---
@@ -152,7 +152,7 @@ of the most common options to set are:
   spark.driver.memory
   1g
   
-Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in MiB 
+Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in bytes 
--- End diff --

Check the config code here.

https://github.com/apache/spark/blob/99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18/core/src/main/scala/org/apache/spark/internal/config/package.scala#L40-L43


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-08-18 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22140

[SPARK-25072][PySpark] Forbid extra value for custom Row

## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row 
in PySpark.

## How was this patch tested?

New UT in pyspark-sql

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25072

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22140


commit b8c6522bccde51584e9878144924fd7b92f8785f
Author: liyuanjian 
Date:   2018-08-18T08:36:53Z

Forbidden extra value for custom Row




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22024#discussion_r213015245
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -160,7 +160,13 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   releaseLock(pieceId)
 case None =>
   bm.getRemoteBytes(pieceId) match {
-case Some(b) =>
+case Some(splitB) =>
+
+  // Checksum computation and further computations require the 
data
+  // from the ChunkedByteBuffer to be merged, so we we merge 
it now.
--- End diff --

nit of the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22024#discussion_r213015113
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
@@ -101,15 +101,7 @@ abstract class BlockTransferService extends 
ShuffleClient with Closeable with Lo
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data: 
ManagedBuffer): Unit = {
-  data match {
-case f: FileSegmentManagedBuffer =>
-  result.success(f)
-case _ =>
-  val ret = ByteBuffer.allocate(data.size.toInt)
--- End diff --

The copy behavior was introduced by : 
https://github.com/apache/spark/pull/2330/commits/69f5d0a2434396abbbd98886e047bc08a9e65565.
 How can you make sure this can be replaced by increasing the reference count?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22024: [SPARK-25034][CORE] Remove allocations in onBlockFetchSu...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22024
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
```
Is that possible to add a test case?
```
Thanks for your reply Xiao, we encountered some difficulties during the 
test case, cause this need mock on speculative behavior. We will keep looking 
into this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
@jiangxb1987 Great thanks for your comment!
```
One general idea is that we don't need to rely on the RPC framework to test 
ContextBarrierState, just mock RpcCallContexts should be enough.
```
Actually I also want to implement like this at first also as you asked in 
jira, but `ContextBarrierState` is the private inner class in 
`BarrierCoordinator`. Could I do the refactor of moving `ContextBarrierState` 
out of `BarrierCoordinator`? If that is permitted I think we can just mock 
RpcCallContext to reach this.
```
We shall cover the following scenarios:
```
Pretty cool for the list, the 5 in front scenarios are including in 
currently implement, I'll add the last checking work of `Make sure we clear all 
the internal data under each case.` after we reach an agreement. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
AFAIC, the fix should forbid illegal extra value passing. If less values 
than fields it should get a `AttributeError` while accessing as the currently 
implement, not ban it here? What do you think :) @HyukjinKwon @BryanCutler 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22177#discussion_r212003441
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---
@@ -337,7 +337,9 @@ private[ui] class JobPage(parent: JobsTab, store: 
AppStatusStore) extends WebUIP
   store.executorList(false), appStartTime)
 
 val operationGraphContent = 
store.asOption(store.operationGraphForJob(jobId)) match {
-  case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, 
operationGraph)
+  case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, 
operationGraph.sortWith(
+
_.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
--- End diff --

Add `getStageId` function in `RDDOperationGraph` to do this will be better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
My pleasure, just find this during glance over jira in recent days. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22180#discussion_r211996874
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -143,6 +143,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   @volatile private var finished = false
   @volatile private var finalStatus = getDefaultFinalStatus
   @volatile private var finalMsg: String = ""
+  private val finalMsgLimitSize = sparkConf.get(AM_FINAL_MSG_LIMIT).toInt
--- End diff --

nit: move this to L165? just for code clean.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22180#discussion_r211996461
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -368,7 +369,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
 logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
   Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-finalMsg = msg
+finalMsg = if (msg == null || msg.length <= finalMsgLimitSize) {
+  msg
+} else {
+  msg.substring(0, finalMsgLimitSize)
--- End diff --

Maybe the message in last `finalMsgLimitSize` is more useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22177#discussion_r212002571
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---
@@ -18,18 +18,18 @@
 package org.apache.spark.ui.jobs
 
 import java.util.Locale
+
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{Buffer, ListBuffer}
 import scala.xml.{Node, NodeSeq, Unparsed, Utility}
-
--- End diff --

revert this changes in import.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22177
  
Please change title to "[SPARK-25199][Web UI] XXX " as we described in 
http://spark.apache.org/contributing.html. 
```
check the DAG chart in job page.
```
Could you also put the DAG chart screenshot after your fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
Gental ping @gatorsmile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r212820814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -307,7 +308,7 @@ case class FileSourceScanExec(
 withSelectedBucketsCount
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  private lazy val inputRDD: RDD[Object] = {
--- End diff --

Thanks Sean! Addressed in 7e88599.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
@cloud-fan Thanks for your reply Wenchen, I'm trying to achieve this in 
this commit, please take a look, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...

2018-07-19 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21533
  
Thanks everyone for your help!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21789: [SPARK-24829][SQL]CAST AS FLOAT inconsistent with...

2018-07-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21789#discussion_r203037295
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 ---
@@ -766,6 +774,14 @@ class HiveThriftHttpServerSuite extends 
HiveThriftJdbcTest {
   assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion)
 }
   }
+
+  test("Checks cast as float") {
--- End diff --

Duplicated code?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21839#discussion_r204447671
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -450,13 +450,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case d @ DeserializeToObject(_, _, child) if (child.outputSet -- 
d.references).nonEmpty =>
   d.copy(child = prunedChild(child, d.references))
 
-// Prunes the unused columns from child of Aggregate/Expand/Generate
+// Prunes the unused columns from child of 
Aggregate/Expand/Generate/ScriptTransformation
 case a @ Aggregate(_, _, child) if (child.outputSet -- 
a.references).nonEmpty =>
   a.copy(child = prunedChild(child, a.references))
 case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- 
f.references).nonEmpty =>
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
+case s @ ScriptTransformation(_, _, _, child, _)
+  if (child.outputSet -- s.references).nonEmpty =>
--- End diff --

Thanks, fix in 2cf131f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
I'll resolve the conflicts today, thanks for ping me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
@gatorsmile Thanks for your advice, added ut in ScriptTransformationSuite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >