[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212200119 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag]( * a map on the other). */ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { -zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => - new Iterator[(T, U)] { -def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { - case (true, true) => true - case (false, false) => false - case _ => throw new SparkException("Can only zip RDDs with " + -"same number of elements in each partition") +zipPartitionsInternal(other, preservesPartitioning = false, orderSensitiveFunc = true) { + (thisIter, otherIter) => +new Iterator[(T, U)] { + def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { +case (true, true) => true +case (false, false) => false +case _ => throw new SparkException("Can only zip RDDs with " + + "same number of elements in each partition") + } + def next(): (T, U) = (thisIter.next(), otherIter.next()) } -def next(): (T, U) = (thisIter.next(), otherIter.next()) - } --- End diff -- yea, just indentation change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22188 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192600 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag]( * a map on the other). */ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { -zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => - new Iterator[(T, U)] { -def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { - case (true, true) => true - case (false, false) => false - case _ => throw new SparkException("Can only zip RDDs with " + -"same number of elements in each partition") +zipPartitionsInternal(other, preservesPartitioning = false, orderSensitiveFunc = true) { + (thisIter, otherIter) => +new Iterator[(T, U)] { + def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { +case (true, true) => true +case (false, false) => false +case _ => throw new SparkException("Can only zip RDDs with " + + "same number of elements in each partition") + } + def next(): (T, U) = (thisIter.next(), otherIter.next()) } -def next(): (T, U) = (thisIter.next(), otherIter.next()) - } --- End diff -- Bulk of the change here is simply indentation right (except for zipPartitions -> zipPartitionsInternal and flag) ? I want to make sure I did not miss something here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212199007 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} + +rollBackStage(failedStage) --- End diff -- This would be implicitly handled anyway, right ? Any reason to specifically do it here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212193814 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag]( */ object RDD { + /** + * The random level of RDD's computing function, which indicates the behavior when rerun the + * computing function. There are 3 random levels, ordered by the randomness from low to high: + * 1. IDEMPOTENT: The computing function always return the same result with same order when rerun. + * 2. UNORDERED: The computing function returns same data set in potentially a different order + * when rerun. + * 3. INDETERMINATE. The computing function may return totally different result when rerun. + * + * Note that, the output of the computing function usually relies on parent RDDs. When a + * parent RDD's computing function is random, it's very likely this computing function is also + * random. + */ + object RandomLevel extends Enumeration { --- End diff -- While reviewing the PR, this enumeration looks a bit unclear. We are modelling two things here : * Behavior of `compute` in RDD - whether it is idempotent, order sensitive or indeterminate. also, * What is the input order of the tuples coming in from the partition to `RDD.compute` For example, shuffle output of a sorted RDD would be `IDEMPOTENT` by this definition - but idempotent is a functional behavior of a closure, not input order. Perhaps we need to convey this better ? Or does 'idempotent' order make sense ? Thoughts @cloud-fan, @markhamstra ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212198632 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -70,7 +70,8 @@ class MyRDD( numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil, -@(transient @param) tracker: MapOutputTrackerMaster = null) +@(transient @param) tracker: MapOutputTrackerMaster = null, +isRandom: Boolean = false) --- End diff -- isRandom -> unordered --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212197939 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} --- End diff -- This method looks expensive for large DAG's, memorization should help reduce the cost. Compute the set of stages to rollback and rollback the stages found. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212199604 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) --- End diff -- This ends up abort'ing the job if I am not wrong : while we want to retry the stage. +CC @markhamstra, @tgravescs if there is a better way to accomplish result stage rollback. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212195284 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala --- @@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] rdd2 = null f = null } + + private def isRandomOrder(rdd: RDD[_]): Boolean = { +rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED --- End diff -- Instead of this, check if order is not IDEMPOTENT. In both unordered and indeterminate case, we should return indeterminate for this zip rdd from computingRandomLevel. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192065 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag]( */ object RDD { + /** + * The random level of RDD's computing function, which indicates the behavior when rerun the + * computing function. There are 3 random levels, ordered by the randomness from low to high: + * 1. IDEMPOTENT: The computing function always return the same result with same order when rerun. + * 2. UNORDERED: The computing function returns same data set in potentially a different order + * when rerun. + * 3. INDETERMINATE. The computing function may return totally different result when rerun. + * + * Note that, the output of the computing function usually relies on parent RDDs. When a + * parent RDD's computing function is random, it's very likely this computing function is also + * random. + */ + object RandomLevel extends Enumeration { --- End diff -- RandomLevel is not very descriptive; particularly after rename of the levels to UNORDERED/INDETERMINATE. OrderSensitivity or something else more descriptive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192772 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { --- End diff -- We will need to expose this with `@Experimental` tag - cant keep it `private[spark]` given the implications for custom RDD's. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212193206 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { +val parentRandomLevels = dependencies.map { + case dep: ShuffleDependency[_, _, _] => +if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + RDD.RandomLevel.INDETERMINATE --- End diff -- It does not matter what the parent RDD's order was - after shuffle, currently, it is going to be always UNORDERED - unless Aggregator and key ordering is specified in dep. Given [this comment](https://github.com/apache/spark/pull/22112#issuecomment-414034703), and adding a few missing cases, this becomes: ``` // If checkpointed already - then always same order case dep: Dependency if dep.rdd.isCheckpointed => RDD.RandomLevel.IDEMPOTENT // if same partitioner, then shuffle not done. case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => dep.rdd.computingRandomLevel // if aggregator specified (and so unique keys) and key ordering specified - then consistent ordering. case dep: ShuffleDependency[_, _, _] if dep.keyOrdering.isDefined && dep.aggregator.isDefined => RDD.RandomLevel.IDEMPOTENT // All other shuffle cases, we dont know the output order in spark. case dep: ShuffleDependency[_, _, _] => RDD.RandomLevel.INDETERMINATE ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192261 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( @transient protected lazy override val isBarrier_ : Boolean = isFromBarrier || dependencies.exists(_.rdd.isBarrier()) + + override private[spark] def computingRandomLevel = { --- End diff -- computingRandomLevel -> computeOrderSensitivity (suffix to match what `RandomLevel` gets renamed to) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212190267 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext} * doesn't modify the keys. * @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage * containing at least one RDDBarrier shall be turned into a barrier stage. + * @param orderSensitiveFunc whether or not the zip function is order-sensitive. If it's order --- End diff -- remove 'zip' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212196598 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { --- End diff -- IIRC this can be a valid case - for example if result stage is being run on only a subset of partitions (first() for example) : so number of missing partitions can be legitimately > numTasks and still have missing relevant partitions. I might be a bit rusty here though, +CC @markhamstra --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22188#discussion_r212199355 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java --- @@ -270,21 +270,23 @@ public boolean nextBatch() throws IOException { private void initializeInternal() throws IOException, UnsupportedOperationException { // Check that the requested schema is supported. missingColumns = new boolean[requestedSchema.getFieldCount()]; +List columns = requestedSchema.getColumns(); +List paths = requestedSchema.getPaths(); --- End diff -- cc @michal-databricks @mswit-databricks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22188 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212197529 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Ideally all the operators will produce UnsafeRow. If the data source does not produce UnsafeRow, Spark will make sure there will be a project above it to produce UnsafeRow, so we don't need to worry it here and safely assume the input is always UnsafeRow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22196 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95142/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22196 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22196 **[Test build #95142 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95142/testReport)** for PR 22196 at commit [`78c8fcd`](https://github.com/apache/spark/commit/78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21977 Is this PR close to getting merged? Or do we have some problems that are hard to solve? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22153 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20226: [SPARK-23034][SQL] Override `nodeName` for all *S...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20226 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212194825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- @mallman It sounds like the changes in this file are not needed. Could you help me point out which test cases will fail? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22143 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22153 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22196 **[Test build #95142 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95142/testReport)** for PR 22196 at commit [`78c8fcd`](https://github.com/apache/spark/commit/78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22196 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataTo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22196 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2476/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of z...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22194 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22196: [SPARK-24811][FOLLOWUP][SQL]Revise package of Avr...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/22196 [SPARK-24811][FOLLOWUP][SQL]Revise package of AvroDataToCatalyst and CatalystDataToAvro ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/21838, the class `AvroDataToCatalyst` and `CatalystDataToAvro` were put in package `org.apache.spark.sql`. They should be moved to package `org.apache.spark.sql.avro`. Also optimize imports in Avro module. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark avro_revise_package_name Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22196 commit 78c8fcd3a7255d5bbac01475fe44fcba0c15a8d9 Author: Gengliang Wang Date: 2018-08-23T06:04:57Z revise package name of AvroDataToCatalyst and CatalystDataToAvro --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22194 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22153 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22153 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95134/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22153 **[Test build #95134 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95134/testReport)** for PR 22153 at commit [`82190ac`](https://github.com/apache/spark/commit/82190accc6182988dfae00a07a656b069aa7b708). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22194 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22194 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95133/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22194 **[Test build #95133 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95133/testReport)** for PR 22194 at commit [`967360a`](https://github.com/apache/spark/commit/967360a1a417739cdada3b7c7334c8ca87ede6a6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 @jiangxb1987 Great thanks for your comment! ``` One general idea is that we don't need to rely on the RPC framework to test ContextBarrierState, just mock RpcCallContexts should be enough. ``` Actually I also want to implement like this at first also as you asked in jira, but `ContextBarrierState` is the private inner class in `BarrierCoordinator`. Could I do the refactor of moving `ContextBarrierState` out of `BarrierCoordinator`? If that is permitted I think we can just mock RpcCallContext to reach this. ``` We shall cover the following scenarios: ``` Pretty cool for the list, the 5 in front scenarios are including in currently implement, I'll add the last checking work of `Make sure we clear all the internal data under each case.` after we reach an agreement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20146 seems like this was a thumbs-up from @WeichenXu123 @jkbradley? @dbtsai ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22121 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22121 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22112 To confirm, is everyone OK with merging this PR, or we are just OK with the direction and need more time to review this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95140/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22121 **[Test build #95140 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95140/testReport)** for PR 22121 at commit [`1f253bf`](https://github.com/apache/spark/commit/1f253bf536c3a7bd1c07ba5ea5600f661c8e106e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22121 **[Test build #95139 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95139/testReport)** for PR 22121 at commit [`8245806`](https://github.com/apache/spark/commit/824580684c05c2a3c1654517b77864ca5d504ee0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95139/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22187 **[Test build #95141 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95141/testReport)** for PR 22187 at commit [`81ef75a`](https://github.com/apache/spark/commit/81ef75a36a1c9dcd6922d2ec77393bc35389efd0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22187 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2475/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22187 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22163 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95130/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22163 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22163 **[Test build #95130 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95130/testReport)** for PR 22163 at commit [`f91e18c`](https://github.com/apache/spark/commit/f91e18c7d4b8eab53c4983320a0eab0403c37a48). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user gengliangwang commented on the issue: https://github.com/apache/spark/pull/22121 The preview doc (zip file in PR description) is updated to latest version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22121 **[Test build #95140 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95140/testReport)** for PR 22121 at commit [`1f253bf`](https://github.com/apache/spark/commit/1f253bf536c3a7bd1c07ba5ea5600f661c8e106e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 @tgravescs: > The shuffle simply transfers the bytes its supposed to. Sparks shuffle of those bytes is not consistent in that the order it fetches from can change and without the sort happening on that data the order can be different on rerun. I guess maybe you mean the ShuffledRDD as a whole or do you mean something else here? By shuffle, I am referring to the output of shuffle which is be consumed by RDD with `ShuffleDependency` as input. More specifically, the output of `SparkEnv.get.shuffleManager.getReader(...).read()` which RDD (user and spark impl's) uses to fetch output of shuffle machinery. This output will not just be shuffle bytes/deserialize, but with aggregation applied (if specified) and ordering imposed (if specified). ShuffledRDD is one such usage within spark core, but others exist within spark core and in user code. > All I'm saying is zip is just another variant of this, you could document it as such and do nothing internal to spark to "fix it". I agree; repartition + shuffle, zip, sample, mllib usages are all variants of the same problem - of shuffle output order being inconsistent. > I guess we can separate out these 2 discussions. I think the point of this pr is to temporarily workaround the data loss/corruption issue with repartition by failing. So if everyone agrees on that lets move the discussion to a jira about what to do with the rest of the operators and fix repartition here. thoughts? Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2474/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22121 **[Test build #95139 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95139/testReport)** for PR 22121 at commit [`8245806`](https://github.com/apache/spark/commit/824580684c05c2a3c1654517b77864ca5d504ee0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22192 **[Test build #95138 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95138/testReport)** for PR 22192 at commit [`7c86fc5`](https://github.com/apache/spark/commit/7c86fc54c36954f1345eccc066873f7f90832657). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2473/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22121 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22112 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22112 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95129/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22152: [SPARK-25159][SQL] json schema inference should o...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22152#discussion_r212183703 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala --- @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { --- End diff -- Same question was in my mind. thanks for clarification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22112 **[Test build #95129 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95129/testReport)** for PR 22112 at commit [`93f37fa`](https://github.com/apache/spark/commit/93f37fa585462b9ee2fb9e179eab736fbc416d3e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22171: [SPARK-25177][SQL] When dataframe decimal type co...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22171#discussion_r212180992 --- Diff: sql/core/src/test/resources/sql-tests/results/literals.sql.out --- @@ -197,7 +197,7 @@ select .e3 -- !query 20 select 1E309, -1E309 -- !query 20 schema -struct<1E+309:decimal(1,-309),-1E+309:decimal(1,-309)> +struct<10:decimal(1,-309),-10:decimal(1,-309)> --- End diff -- @vinodkc how does it show in Postgres? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20345 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95131/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20345 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20345 **[Test build #95131 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95131/testReport)** for PR 20345 at commit [`39462fb`](https://github.com/apache/spark/commit/39462fbee952ec574b4c04d7718fd73bb5f56d9d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21546 **[Test build #95137 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95137/testReport)** for PR 21546 at commit [`5549644`](https://github.com/apache/spark/commit/554964465dbcb99cc313620fafb0fc41acfd4304). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/Hive t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22153 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2472/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22157: [SPARK-25126][SQL] Avoid creating Reader for all ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22157#discussion_r212178321 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala --- @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } +def testAllCorruptFiles(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + +def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => +val basePath = dir.getCanonicalPath +spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) +spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) +val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) +assert(df.count() == 0) + } +} + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { +testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) --- End diff -- why the error message changed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21546 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r212178291 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +178,106 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(in)), allocator) // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) +try { + // Create array so that we can safely close the file + val batches = getBatchesFromStream(fileStream.getChannel).toArray + // Parallelize the record batches to create an RDD + JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, batches.length)) --- End diff -- Ah, sorry. You are right. I misread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22192 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22192 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95136/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22192 **[Test build #95136 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95136/testReport)** for PR 22192 at commit [`44454dd`](https://github.com/apache/spark/commit/44454dd586e35bdf16492c4a8969494bd3b7f8f5). * This patch **fails Java style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` .doc(\"Comma-separated list of class names for \"plugins\" implementing \" +` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22112 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95128/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22112 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22112 **[Test build #95128 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95128/testReport)** for PR 22112 at commit [`097092b`](https://github.com/apache/spark/commit/097092be4b2967689082af62715ecc4f78086c30). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918] Executor Plugin API
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22192 **[Test build #95136 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95136/testReport)** for PR 22192 at commit [`44454dd`](https://github.com/apache/spark/commit/44454dd586e35bdf16492c4a8969494bd3b7f8f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918] Executor Plugin API
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/22192 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api
Github user squito commented on the issue: https://github.com/apache/spark/pull/21923 this is being continued in https://github.com/apache/spark/pull/22192 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api
Github user squito closed the pull request at: https://github.com/apache/spark/pull/21923 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22195 **[Test build #95135 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95135/testReport)** for PR 22195 at commit [`b927b94`](https://github.com/apache/spark/commit/b927b94ea1312ba74d73c203adb7683b2fb42fed). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r212171980 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +178,106 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(in)), allocator) // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) +try { + // Create array so that we can safely close the file + val batches = getBatchesFromStream(fileStream.getChannel).toArray + // Parallelize the record batches to create an RDD + JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, batches.length)) --- End diff -- so this the length of the array of batches, not the number of records in the batch. The input is split according to the default parallelism config. So if that is 32, we will have an array of 32 batches and then parallelize those to 32 partitions. `parallelize` might usually have one big array of primitives as the first arg, that you then partition by the number in the second arg, but this is a little different since we are using batches. Does that answer your question? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22195 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2471/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22195: [SPARK-25205][CORE] Fix typo in spark.network.crypto.key...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22195 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22161: [SPARK-25167][SPARKR][TEST][MINOR] Minor fixes for R sql...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22161 Ah, it's okie. Yes, please. Not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22195: [CORE] Fix typo in spark.network.crypto.keyFactor...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/22195 [CORE] Fix typo in spark.network.crypto.keyFactoryIterations You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-25205 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22195 commit b927b94ea1312ba74d73c203adb7683b2fb42fed Author: Imran Rashid Date: 2018-08-23T03:11:40Z [CORE] Fix typo in spark.network.crypto.keyFactoryIterations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22194: [SPARK-23932][SQL][FOLLOW-UP] Fix an example of zip_with...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22194 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r212170997 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3268,13 +3268,49 @@ class Dataset[T] private[sql]( } /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. + * Collect a Dataset as Arrow batches and serve stream to PySpark. */ private[sql] def collectAsArrowToPython(): Array[Any] = { +val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone + withAction("collectAsArrowToPython", queryExecution) { plan => - val iter: Iterator[Array[Byte]] = -toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") + PythonRDD.serveToStream("serve-Arrow") { out => +val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) +val arrowBatchRdd = toArrowBatchRdd(plan) +val numPartitions = arrowBatchRdd.partitions.length + +// Store collection results for worst case of 1 to N-1 partitions --- End diff -- It's not necessary to buffer the first partition because it can be sent to Python right away, so only need an array of size N-1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r212171051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +178,106 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(in)), allocator) // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) --- End diff -- yup, thanks for catching that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream forma...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r212170606 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -183,34 +178,106 @@ private[sql] object ArrowConverters { } /** - * Convert a byte array to an ArrowRecordBatch. + * Load a serialized ArrowRecordBatch. */ - private[arrow] def byteArrayToBatch( + private[arrow] def loadBatch( batchBytes: Array[Byte], allocator: BufferAllocator): ArrowRecordBatch = { -val in = new ByteArrayReadableSeekableByteChannel(batchBytes) -val reader = new ArrowFileReader(in, allocator) - -// Read a batch from a byte stream, ensure the reader is closed -Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch -} { - reader.close() -} +val in = new ByteArrayInputStream(batchBytes) +MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(in)), allocator) // throws IOException } + /** + * Create a DataFrame from a JavaRDD of serialized ArrowRecordBatches. + */ private[sql] def toDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -val rdd = payloadRDD.rdd.mapPartitions { iter => +val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] +val timeZoneId = sqlContext.sessionState.conf.sessionLocalTimeZone +val rdd = arrowBatchRDD.rdd.mapPartitions { iter => val context = TaskContext.get() - ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) + ArrowConverters.fromBatchIterator(iter, schema, timeZoneId, context) } -val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] sqlContext.internalCreateDataFrame(rdd, schema) } + + /** + * Read a file as an Arrow stream and parallelize as an RDD of serialized ArrowRecordBatches. + */ + private[sql] def readArrowStreamFromFile( + sqlContext: SQLContext, + filename: String): JavaRDD[Array[Byte]] = { +val fileStream = new FileInputStream(filename) +try { + // Create array so that we can safely close the file + val batches = getBatchesFromStream(fileStream.getChannel).toArray + // Parallelize the record batches to create an RDD + JavaRDD.fromRDD(sqlContext.sparkContext.parallelize(batches, batches.length)) +} finally { + fileStream.close() +} + } + + /** + * Read an Arrow stream input and return an iterator of serialized ArrowRecordBatches. + */ + private[sql] def getBatchesFromStream(in: SeekableByteChannel): Iterator[Array[Byte]] = { + +// Create an iterator to get each serialized ArrowRecordBatch from a stream +new Iterator[Array[Byte]] { + var batch: Array[Byte] = readNextBatch() + + override def hasNext: Boolean = batch != null + + override def next(): Array[Byte] = { +val prevBatch = batch +batch = readNextBatch() +prevBatch + } + + def readNextBatch(): Array[Byte] = { +val msgMetadata = MessageSerializer.readMessage(new ReadChannel(in)) +if (msgMetadata == null) { + return null +} + +// Get the length of the body, which has not be read at this point +val bodyLength = msgMetadata.getMessageBodyLength.toInt + +// Only care about RecordBatch data, skip Schema and unsupported Dictionary messages +if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) { + + // Create output backed by buffer to hold msg length (int32), msg metadata, msg body + val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength) --- End diff -- I'll add some more details about what this is doing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org