[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212200119
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag](
* a map on the other).
*/
   def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
-zipPartitions(other, preservesPartitioning = false) { (thisIter, 
otherIter) =>
-  new Iterator[(T, U)] {
-def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match 
{
-  case (true, true) => true
-  case (false, false) => false
-  case _ => throw new SparkException("Can only zip RDDs with " +
-"same number of elements in each partition")
+zipPartitionsInternal(other, preservesPartitioning = false, 
orderSensitiveFunc = true) {
+  (thisIter, otherIter) =>
+new Iterator[(T, U)] {
+  def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) 
match {
+case (true, true) => true
+case (false, false) => false
+case _ => throw new SparkException("Can only zip RDDs with " +
+  "same number of elements in each partition")
+  }
+  def next(): (T, U) = (thisIter.next(), otherIter.next())
 }
-def next(): (T, U) = (thisIter.next(), otherIter.next())
-  }
--- End diff --

yea, just indentation change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...

2018-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22188


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192600
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag](
* a map on the other).
*/
   def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
-zipPartitions(other, preservesPartitioning = false) { (thisIter, 
otherIter) =>
-  new Iterator[(T, U)] {
-def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match 
{
-  case (true, true) => true
-  case (false, false) => false
-  case _ => throw new SparkException("Can only zip RDDs with " +
-"same number of elements in each partition")
+zipPartitionsInternal(other, preservesPartitioning = false, 
orderSensitiveFunc = true) {
+  (thisIter, otherIter) =>
+new Iterator[(T, U)] {
+  def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) 
match {
+case (true, true) => true
+case (false, false) => false
+case _ => throw new SparkException("Can only zip RDDs with " +
+  "same number of elements in each partition")
+  }
+  def next(): (T, U) = (thisIter.next(), otherIter.next())
 }
-def next(): (T, U) = (thisIter.next(), otherIter.next())
-  }
--- End diff --

Bulk of the change here is simply indentation right (except for 
zipPartitions -> zipPartitionsInternal and flag) ? I want to make sure I did 
not miss something here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212199007
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
+
+rollBackStage(failedStage)
--- End diff --

This would be implicitly handled anyway, right ? Any reason to specifically 
do it here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212193814
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  /**
+   * The random level of RDD's computing function, which indicates the 
behavior when rerun the
+   * computing function. There are 3 random levels, ordered by the 
randomness from low to high:
+   * 1. IDEMPOTENT: The computing function always return the same result 
with same order when rerun.
+   * 2. UNORDERED: The computing function returns same data set in 
potentially a different order
+   *   when rerun.
+   * 3. INDETERMINATE. The computing function may return totally different 
result when rerun.
+   *
+   * Note that, the output of the computing function usually relies on 
parent RDDs. When a
+   * parent RDD's computing function is random, it's very likely this 
computing function is also
+   * random.
+   */
+  object RandomLevel extends Enumeration {
--- End diff --

While reviewing the PR, this enumeration looks a bit unclear.
We are modelling two things here :
* Behavior of `compute` in RDD - whether it is idempotent, order sensitive 
or indeterminate.
also,
* What is the input order of the tuples coming in from the partition to 
`RDD.compute`

For example, shuffle output of a sorted RDD would be `IDEMPOTENT` by this 
definition - but idempotent is a functional behavior of a closure, not input 
order. Perhaps we need to convey this better ? Or does 'idempotent' order make 
sense ?
Thoughts @cloud-fan, @markhamstra ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212198632
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -70,7 +70,8 @@ class MyRDD(
 numPartitions: Int,
 dependencies: List[Dependency[_]],
 locations: Seq[Seq[String]] = Nil,
-@(transient @param) tracker: MapOutputTrackerMaster = null)
+@(transient @param) tracker: MapOutputTrackerMaster = null,
+isRandom: Boolean = false)
--- End diff --

isRandom -> unordered


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212197939
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
--- End diff --

This method looks expensive for large DAG's, memorization should help 
reduce the cost.
Compute the set of stages to rollback and rollback the stages found.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212199604
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
--- End diff --

This ends up abort'ing the job if I am not wrong : while we want to retry 
the stage.
+CC @markhamstra, @tgravescs  if there is a better way to accomplish result 
stage rollback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212195284
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala ---
@@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, 
B: ClassTag, V: ClassTag]
 rdd2 = null
 f = null
   }
+
+  private def isRandomOrder(rdd: RDD[_]): Boolean = {
+rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED
--- End diff --

Instead of this, check if order is not IDEMPOTENT.
In both unordered and indeterminate case, we should return indeterminate 
for this zip rdd from computingRandomLevel.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192065
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  /**
+   * The random level of RDD's computing function, which indicates the 
behavior when rerun the
+   * computing function. There are 3 random levels, ordered by the 
randomness from low to high:
+   * 1. IDEMPOTENT: The computing function always return the same result 
with same order when rerun.
+   * 2. UNORDERED: The computing function returns same data set in 
potentially a different order
+   *   when rerun.
+   * 3. INDETERMINATE. The computing function may return totally different 
result when rerun.
+   *
+   * Note that, the output of the computing function usually relies on 
parent RDDs. When a
+   * parent RDD's computing function is random, it's very likely this 
computing function is also
+   * random.
+   */
+  object RandomLevel extends Enumeration {
--- End diff --

RandomLevel is not very descriptive; particularly after rename of the 
levels to UNORDERED/INDETERMINATE.
OrderSensitivity or something else more descriptive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192772
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
--- End diff --

We will need to expose this with `@Experimental` tag - cant keep it 
`private[spark]` given the implications for custom RDD's.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212193206
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
+val parentRandomLevels = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
+if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
+  RDD.RandomLevel.INDETERMINATE
--- End diff --

It does not matter what the parent RDD's order was - after shuffle, 
currently, it is going to be always UNORDERED - unless Aggregator and key 
ordering is specified in dep.
Given [this 
comment](https://github.com/apache/spark/pull/22112#issuecomment-414034703), 
and adding a few missing cases, this becomes:
```
  // If checkpointed already - then always same order
  case dep: Dependency if dep.rdd.isCheckpointed => 
RDD.RandomLevel.IDEMPOTENT
  // if same partitioner, then shuffle not done.
  case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => 
dep.rdd.computingRandomLevel
  // if aggregator specified (and so unique keys) and key ordering 
specified - then consistent ordering.
  case dep: ShuffleDependency[_, _, _] if dep.keyOrdering.isDefined && 
dep.aggregator.isDefined =>
RDD.RandomLevel.IDEMPOTENT
  // All other shuffle cases, we dont know the output order in spark.
  case dep: ShuffleDependency[_, _, _] => RDD.RandomLevel.INDETERMINATE
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192261
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
 
   @transient protected lazy override val isBarrier_ : Boolean =
 isFromBarrier || dependencies.exists(_.rdd.isBarrier())
+
+  override private[spark] def computingRandomLevel = {
--- End diff --

computingRandomLevel -> computeOrderSensitivity
(suffix to match what `RandomLevel` gets renamed to)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212190267
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext}
  *  doesn't modify the keys.
  * @param isFromBarrier Indicates whether this RDD is transformed from an 
RDDBarrier, a stage
  *  containing at least one RDDBarrier shall be turned 
into a barrier stage.
+ * @param orderSensitiveFunc whether or not the zip function is 
order-sensitive. If it's order
--- End diff --

remove 'zip'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-22 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212196598
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
--- End diff --

IIRC this can be a valid case - for example if result stage is being run on 
only a subset of partitions (first() for example) : so number of missing 
partitions can be legitimately > numTasks and still have missing relevant 
partitions.
I might be a bit rusty here though, +CC @markhamstra 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...

2018-08-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/22188#discussion_r212199355
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -270,21 +270,23 @@ public boolean nextBatch() throws IOException {
   private void initializeInternal() throws IOException, 
UnsupportedOperationException {
 // Check that the requested schema is supported.
 missingColumns = new boolean[requestedSchema.getFieldCount()];
+List columns = requestedSchema.getColumns();
+List paths = requestedSchema.getPaths();
--- End diff --

cc @michal-databricks @mswit-databricks 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22188
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22104#discussion_r212197529
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 ---
@@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
   }
 }.toArray
   }.toArray
-  val projection = newMutableProjection(allInputs, child.output)
+
+  // Project input rows to unsafe row so we can put it in the row queue
+  val unsafeProjection = UnsafeProjection.create(child.output, 
child.output)
--- End diff --

Ideally all the operators will produce UnsafeRow. If the data source does 
not produce UnsafeRow, Spark will make sure there will be a project above it to 
produce UnsafeRow, so we don't need to worry it here and safely assume the 
input is always UnsafeRow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22196
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95142/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22196
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22196
  
**[Test build #95142 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95142/testReport)**
 for PR 22196 at commit 
[`78c8fcd`](https://github.com/apache/spark/commit/78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21977
  
Is this PR close to getting merged? Or do we have some problems that are 
hard to solve?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD...

2018-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22153


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20226: [SPARK-23034][SQL] Override `nodeName` for all *S...

2018-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20226


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212194825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(
 
   override def start(): Unit = {
 var i = 0
-while (i < currentRow.numFields) {
+while (i < fieldConverters.length) {
   fieldConverters(i).updater.start()
   currentRow.setNullAt(i)
   i += 1
 }
+while (i < currentRow.numFields) {
--- End diff --

@mallman It sounds like the changes in this file are not needed. Could you 
help me point out which test cases will fail?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22143
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22153
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22196
  
**[Test build #95142 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95142/testReport)**
 for PR 22196 at commit 
[`78c8fcd`](https://github.com/apache/spark/commit/78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22196
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22196
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2476/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of z...

2018-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22194


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of Avr...

2018-08-22 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/22196

[SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataToCatalyst and 
CatalystDataToAvro

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/21838, the class 
`AvroDataToCatalyst` and `CatalystDataToAvro` were put in package 
`org.apache.spark.sql`.
They should be moved to package  `org.apache.spark.sql.avro`.
Also optimize imports in Avro module.

## How was this patch tested?

Unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark avro_revise_package_name

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22196


commit 78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9
Author: Gengliang Wang 
Date:   2018-08-23T06:04:57Z

revise package name of AvroDataToCatalyst and CatalystDataToAvro




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22194
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22153
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22153
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95134/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22153
  
**[Test build #95134 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95134/testReport)**
 for PR 22153 at commit 
[`82190ac`](https://github.com/apache/spark/commit/82190accc6182988dfae00a07a656b069aa7b708).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22194
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22194
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95133/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22194
  
**[Test build #95133 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95133/testReport)**
 for PR 22194 at commit 
[`967360a`](https://github.com/apache/spark/commit/967360a1a417739cdada3b7c7334c8ca87ede6a6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
@jiangxb1987 Great thanks for your comment!
```
One general idea is that we don't need to rely on the RPC framework to test 
ContextBarrierState, just mock RpcCallContexts should be enough.
```
Actually I also want to implement like this at first also as you asked in 
jira, but `ContextBarrierState` is the private inner class in 
`BarrierCoordinator`. Could I do the refactor of moving `ContextBarrierState` 
out of `BarrierCoordinator`? If that is permitted I think we can just mock 
RpcCallContext to reach this.
```
We shall cover the following scenarios:
```
Pretty cool for the list, the 5 in front scenarios are including in 
currently implement, I'll add the last checking work of `Make sure we clear all 
the internal data under each case.` after we reach an agreement. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...

2018-08-22 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20146
  
seems like this was a thumbs-up from  @WeichenXu123 @jkbradley?
@dbtsai ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22121


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22121
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22112
  
To confirm, is everyone OK with merging this PR, or we are just OK with the 
direction and need more time to review this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95140/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22121
  
**[Test build #95140 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95140/testReport)**
 for PR 22121 at commit 
[`1f253bf`](https://github.com/apache/spark/commit/1f253bf536c3a7bd1c07ba5ea5600f661c8e106e).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22121
  
**[Test build #95139 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95139/testReport)**
 for PR 22121 at commit 
[`8245806`](https://github.com/apache/spark/commit/824580684c05c2a3c1654517b77864ca5d504ee0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95139/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22187
  
**[Test build #95141 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95141/testReport)**
 for PR 22187 at commit 
[`81ef75a`](https://github.com/apache/spark/commit/81ef75a36a1c9dcd6922d2ec77393bc35389efd0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22187
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2475/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22187
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22163
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95130/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22163
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22163
  
**[Test build #95130 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95130/testReport)**
 for PR 22163 at commit 
[`f91e18c`](https://github.com/apache/spark/commit/f91e18c7d4b8eab53c4983320a0eab0403c37a48).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread gengliangwang
Github user gengliangwang commented on the issue:

https://github.com/apache/spark/pull/22121
  
The preview doc (zip file in PR description) is updated to latest version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22121
  
**[Test build #95140 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95140/testReport)**
 for PR 22121 at commit 
[`1f253bf`](https://github.com/apache/spark/commit/1f253bf536c3a7bd1c07ba5ea5600f661c8e106e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  

@tgravescs:
> The shuffle simply transfers the bytes its supposed to. Sparks shuffle of 
those bytes is not consistent in that the order it fetches from can change and 
without the sort happening on that data the order can be different on rerun. I 
guess maybe you mean the ShuffledRDD as a whole or do you mean something else 
here?


By shuffle, I am referring to the output of shuffle which is be consumed by 
RDD with `ShuffleDependency` as input.
More specifically, the output of 
`SparkEnv.get.shuffleManager.getReader(...).read()` which RDD (user and spark 
impl's) uses to fetch output of shuffle machinery.
This output will not just be shuffle bytes/deserialize, but with 
aggregation applied (if specified) and ordering imposed (if specified).

ShuffledRDD is one such usage within spark core, but others exist within 
spark core and in user code.

> All I'm saying is zip is just another variant of this, you could document 
it as such and do nothing internal to spark to "fix it".

I agree; repartition + shuffle, zip, sample, mllib usages are all variants 
of the same problem - of shuffle output order being inconsistent.

> I guess we can separate out these 2 discussions. I think the point of 
this pr is to temporarily workaround the data loss/corruption issue with 
repartition by failing. So if everyone agrees on that lets move the discussion 
to a jira about what to do with the rest of the operators and fix repartition 
here. thoughts?

Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2474/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22121
  
**[Test build #95139 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95139/testReport)**
 for PR 22121 at commit 
[`8245806`](https://github.com/apache/spark/commit/824580684c05c2a3c1654517b77864ca5d504ee0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22192
  
**[Test build #95138 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95138/testReport)**
 for PR 22192 at commit 
[`7c86fc5`](https://github.com/apache/spark/commit/7c86fc54c36954f1345eccc066873f7f90832657).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2473/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22121
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22112
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22112
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95129/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22152#discussion_r212183703
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 ---
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling 
`RDD.fold` directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong 
configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set 
the SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
--- End diff --

Same question was in my mind. thanks for clarification.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22112
  
**[Test build #95129 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95129/testReport)**
 for PR 22112 at commit 
[`93f37fa`](https://github.com/apache/spark/commit/93f37fa585462b9ee2fb9e179eab736fbc416d3e).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22171: [SPARK-25177][SQL] When dataframe decimal type co...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22171#discussion_r212180992
  
--- Diff: sql/core/src/test/resources/sql-tests/results/literals.sql.out ---
@@ -197,7 +197,7 @@ select .e3
 -- !query 20
 select 1E309, -1E309
 -- !query 20 schema
-struct<1E+309:decimal(1,-309),-1E+309:decimal(1,-309)>

+struct<10:decimal(1,-309),-10:decimal(1,-309)>
--- End diff --

@vinodkc how does it show in Postgres?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20345
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95131/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20345
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20345
  
**[Test build #95131 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95131/testReport)**
 for PR 20345 at commit 
[`39462fb`](https://github.com/apache/spark/commit/39462fbee952ec574b4c04d7718fd73bb5f56d9d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21546
  
**[Test build #95137 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95137/testReport)**
 for PR 21546 at commit 
[`5549644`](https://github.com/apache/spark/commit/554964465dbcb99cc313620fafb0fc41acfd4304).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22153
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2472/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...

2018-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22157#discussion_r212178321
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 ---
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
   }
 }
 
+def testAllCorruptFiles(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
+def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+  withTempDir { dir =>
+val basePath = dir.getCanonicalPath
+spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+val df = spark.read.schema("a long").orc(
+  new Path(basePath, "first").toString,
+  new Path(basePath, "second").toString)
+assert(df.count() == 0)
+  }
+}
+
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
   testIgnoreCorruptFiles()
   testIgnoreCorruptFilesWithoutSchemaInfer()
+  val m1 = intercept[AnalysisException] {
+testAllCorruptFiles()
+  }.getMessage
+  assert(m1.contains("Unable to infer schema for ORC"))
+  testAllCorruptFilesWithoutSchemaInfer()
 }
 
 withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
   val m1 = intercept[SparkException] {
 testIgnoreCorruptFiles()
   }.getMessage
-  assert(m1.contains("Could not read footer for file"))
+  assert(m1.contains("Malformed ORC file"))
--- End diff --

why the error message changed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21546
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r212178291
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeRecordBatch(
+  new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String): JavaRDD[Array[Byte]] = {
+val fileStream = new FileInputStream(filename)
+try {
+  // Create array so that we can safely close the file
+  val batches = getBatchesFromStream(fileStream.getChannel).toArray
+  // Parallelize the record batches to create an RDD
+  JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
--- End diff --

Ah, sorry. You are right. I misread.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22192
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22192
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95136/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22192
  
**[Test build #95136 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95136/testReport)**
 for PR 22192 at commit 
[`44454dd`](https://github.com/apache/spark/commit/44454dd586e35bdf16492c4a8969494bd3b7f8f5).
 * This patch **fails Java style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  .doc(\"Comma-separated list of class names for \"plugins\" 
implementing \" +`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22112
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95128/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22112
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22112
  
**[Test build #95128 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95128/testReport)**
 for PR 22112 at commit 
[`097092b`](https://github.com/apache/spark/commit/097092be4b2967689082af62715ecc4f78086c30).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918] Executor Plugin API

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22192
  
**[Test build #95136 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95136/testReport)**
 for PR 22192 at commit 
[`44454dd`](https://github.com/apache/spark/commit/44454dd586e35bdf16492c4a8969494bd3b7f8f5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918] Executor Plugin API

2018-08-22 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/22192
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api

2018-08-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/21923
  
this is being continued in https://github.com/apache/spark/pull/22192


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api

2018-08-22 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/21923


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...

2018-08-22 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22195
  
**[Test build #95135 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95135/testReport)**
 for PR 22195 at commit 
[`b927b94`](https://github.com/apache/spark/commit/b927b94ea1312ba74d73c203adb7683b2fb42fed).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r212171980
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeRecordBatch(
+  new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String): JavaRDD[Array[Byte]] = {
+val fileStream = new FileInputStream(filename)
+try {
+  // Create array so that we can safely close the file
+  val batches = getBatchesFromStream(fileStream.getChannel).toArray
+  // Parallelize the record batches to create an RDD
+  JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
--- End diff --

so this the length of the array of batches, not the number of records in 
the batch.  The input is split according to the default parallelism config.  So 
if that is 32, we will have an array of 32 batches and then parallelize those 
to 32 partitions. `parallelize` might usually have one big array of primitives 
as the first arg, that you then partition by the number in the second arg, but 
this is a little different since we are using batches. Does that answer your 
question?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22195
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2471/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...

2018-08-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22195
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22161: [SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22161
  
Ah, it's okie. Yes, please. Not a big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22195: [CORE] Fix typo in spark.network.crypto.keyFactor...

2018-08-22 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/22195

[CORE] Fix typo in spark.network.crypto.keyFactoryIterations



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-25205

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22195


commit b927b94ea1312ba74d73c203adb7683b2fb42fed
Author: Imran Rashid 
Date:   2018-08-23T03:11:40Z

[CORE] Fix typo in spark.network.crypto.keyFactoryIterations




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...

2018-08-22 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/22194
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r212170997
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3268,13 +3268,49 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to PySpark.
*/
   private[sql] def collectAsArrowToPython(): Array[Any] = {
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  val iter: Iterator[Array[Byte]] =
-toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable)
-  PythonRDD.serveIterator(iter, "serve-Arrow")
+  PythonRDD.serveToStream("serve-Arrow") { out =>
+val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
+val arrowBatchRdd = toArrowBatchRdd(plan)
+val numPartitions = arrowBatchRdd.partitions.length
+
+// Store collection results for worst case of 1 to N-1 partitions
--- End diff --

It's not necessary to buffer the first partition because it can be sent to 
Python right away, so only need an array of size N-1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r212171051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeRecordBatch(
+  new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String): JavaRDD[Array[Byte]] = {
+val fileStream = new FileInputStream(filename)
--- End diff --

yup, thanks for catching that


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...

2018-08-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r212170606
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -183,34 +178,106 @@ private[sql] object ArrowConverters {
   }
 
   /**
-   * Convert a byte array to an ArrowRecordBatch.
+   * Load a serialized ArrowRecordBatch.
*/
-  private[arrow] def byteArrayToBatch(
+  private[arrow] def loadBatch(
   batchBytes: Array[Byte],
   allocator: BufferAllocator): ArrowRecordBatch = {
-val in = new ByteArrayReadableSeekableByteChannel(batchBytes)
-val reader = new ArrowFileReader(in, allocator)
-
-// Read a batch from a byte stream, ensure the reader is closed
-Utils.tryWithSafeFinally {
-  val root = reader.getVectorSchemaRoot  // throws IOException
-  val unloader = new VectorUnloader(root)
-  reader.loadNextBatch()  // throws IOException
-  unloader.getRecordBatch
-} {
-  reader.close()
-}
+val in = new ByteArrayInputStream(batchBytes)
+MessageSerializer.deserializeRecordBatch(
+  new ReadChannel(Channels.newChannel(in)), allocator)  // throws 
IOException
   }
 
+  /**
+   * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches.
+   */
   private[sql] def toDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-val rdd = payloadRDD.rdd.mapPartitions { iter =>
+val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone
+val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
   val context = TaskContext.get()
-  ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
+  ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context)
 }
-val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
 sqlContext.internalCreateDataFrame(rdd, schema)
   }
+
+  /**
+   * Read a file as an Arrow stream and parallelize as an RDD of 
serialized ArrowRecordBatches.
+   */
+  private[sql] def readArrowStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String): JavaRDD[Array[Byte]] = {
+val fileStream = new FileInputStream(filename)
+try {
+  // Create array so that we can safely close the file
+  val batches = getBatchesFromStream(fileStream.getChannel).toArray
+  // Parallelize the record batches to create an RDD
+  JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, 
batches.length))
+} finally {
+  fileStream.close()
+}
+  }
+
+  /**
+   * Read an Arrow stream input and return an iterator of serialized 
ArrowRecordBatches.
+   */
+  private[sql] def getBatchesFromStream(in: SeekableByteChannel): 
Iterator[Array[Byte]] = {
+
+// Create an iterator to get each serialized ArrowRecordBatch from a 
stream
+new Iterator[Array[Byte]] {
+  var batch: Array[Byte] = readNextBatch()
+
+  override def hasNext: Boolean = batch != null
+
+  override def next(): Array[Byte] = {
+val prevBatch = batch
+batch = readNextBatch()
+prevBatch
+  }
+
+  def readNextBatch(): Array[Byte] = {
+val msgMetadata = MessageSerializer.readMessage(new 
ReadChannel(in))
+if (msgMetadata == null) {
+  return null
+}
+
+// Get the length of the body, which has not be read at this point
+val bodyLength = msgMetadata.getMessageBodyLength.toInt
+
+// Only care about RecordBatch data, skip Schema and unsupported 
Dictionary messages
+if (msgMetadata.getMessage.headerType() == 
MessageHeader.RecordBatch) {
+
+  // Create output backed by buffer to hold msg length (int32), 
msg metadata, msg body
+  val bbout = new ByteBufferOutputStream(4 + 
msgMetadata.getMessageLength + bodyLength)
--- End diff --

I'll add some more details about what this is doing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >