[GitHub] spark pull request #18055: [Core][WIP] Make the object in TorrentBroadcast a...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/18055#discussion_r117666471 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -54,7 +54,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. */ -private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) +private[spark] class TorrentBroadcast[T: ClassTag](@transient val obj: T, id: Long) --- End diff -- For the old code, obj is not a field, it is just a ctor argument --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97783672 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,25 +95,101 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = LocalPartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchange.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().numberOfOutput.toSeq +} else { + Nil +} - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil +// Try to keep child plan's original data parallelism or not. It is enabled by default. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit - override def outputPartitioning: Partitioning = child.outputPartitioning +val shuffled = new ShuffledRowRDD(shuffleDependency) - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + shuffled +} else if (!respectChildParallelism) { + // This is mainly for tests. + // We take the rows of each partition until we reach the required limit number. --- End diff -- Hmm, i think the old single partition is not like this, why we need this branch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97701247 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = FakePartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchange.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().numberOfOutput.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// Try to keep child plan's original data parallelism or not. It is enabled by default. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + childRDD +} else if (!respectChildParallelism) { + // This is mainly for tests. + // We take the rows of each partition until we reach the required limit number. + var countForRows = 0 + val takeAmounts = new mutable.HashMap[Int, Int]() + numberOfOutput.zipWithIndex.foreach { case (num, index) => +if (countForRows + num < limit) { + countForRows += num + takeAmounts += ((index, num)) +} else { + val toTake = limit - countForRows + countForRows += toTake + takeAmounts += ((index, toTake)) +} + } + val shuffled = new ShuffledRowRDD(shuffleDependency) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +takeAmounts.get(index).map { size => + iter.take(size) +}.getOrElse(iter) + } +} else { + // We try to distribute the required limit number of rows across all child rdd's partitions. + var numToReduce = (sumOfOutput - limit) + val reduceAmounts = new mutable.HashMap[Int, Int]() + val nonEmptyParts = numberOfOutput.filter(_ > 0).size + val reducePerPart = numToReduce / nonEmptyParts + numberOfOutput.zipWithIndex.foreach { case (num, index) => +if (num >= reducePerPart) { + numToReduce -= reducePerPart + reduceAmounts += ((index, reducePerPart)) +} else { + numToReduce -= num + reduceAmounts += ((index, num)) +} + } + while (numToReduce > 0) { +numberOfOutput.zipWithIndex.foreach { case (num, index) => + val toReduce = if (numToReduce / nonEmptyParts > 0) { +numToReduce / nonEmptyParts + } else { +numToReduce + } + if (num - reduceAmounts(index) >= toReduce) { +reduceAmounts(index) = reduceAmounts(index) + toReduce +numToReduce -= toReduce + } else if (num - reduceAmounts(index) > 0) { +reduceAmounts(index) = reduceAmounts(index) + 1 +numToReduce -= 1 + } +} + } + + val shuffled = new ShuffledRowRDD(shuffleDependency) + shuff
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97700863 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = FakePartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchange.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().numberOfOutput.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// Try to keep child plan's original data parallelism or not. It is enabled by default. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + childRDD +} else if (!respectChildParallelism) { + // This is mainly for tests. + // We take the rows of each partition until we reach the required limit number. + var countForRows = 0 + val takeAmounts = new mutable.HashMap[Int, Int]() + numberOfOutput.zipWithIndex.foreach { case (num, index) => +if (countForRows + num < limit) { + countForRows += num + takeAmounts += ((index, num)) +} else { + val toTake = limit - countForRows + countForRows += toTake + takeAmounts += ((index, toTake)) +} + } + val shuffled = new ShuffledRowRDD(shuffleDependency) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +takeAmounts.get(index).map { size => + iter.take(size) +}.getOrElse(iter) + } +} else { + // We try to distribute the required limit number of rows across all child rdd's partitions. + var numToReduce = (sumOfOutput - limit) + val reduceAmounts = new mutable.HashMap[Int, Int]() --- End diff -- its better to broadcast `reduceAmounts` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97700723 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = FakePartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchange.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().numberOfOutput.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// Try to keep child plan's original data parallelism or not. It is enabled by default. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + childRDD +} else if (!respectChildParallelism) { + // This is mainly for tests. + // We take the rows of each partition until we reach the required limit number. + var countForRows = 0 + val takeAmounts = new mutable.HashMap[Int, Int]() + numberOfOutput.zipWithIndex.foreach { case (num, index) => +if (countForRows + num < limit) { + countForRows += num + takeAmounts += ((index, num)) +} else { + val toTake = limit - countForRows + countForRows += toTake + takeAmounts += ((index, toTake)) +} + } + val shuffled = new ShuffledRowRDD(shuffleDependency) + shuffled.mapPartitionsWithIndexInternal { case (index, iter) => +takeAmounts.get(index).map { size => + iter.take(size) +}.getOrElse(iter) --- End diff -- getOrElse(empty iter)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97700670 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,25 +95,100 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the `limit` elements of the child output. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning -} -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { +val childRDD = child.execute() +val partitioner = FakePartitioning(child.outputPartitioning, + childRDD.getNumPartitions) +val shuffleDependency = ShuffleExchange.prepareShuffleDependency( + childRDD, child.output, partitioner, serializer) +val numberOfOutput: Seq[Int] = if (shuffleDependency.rdd.getNumPartitions != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + val submittedStageFuture = sparkContext.submitMapStage(shuffleDependency) + submittedStageFuture.get().numberOfOutput.toSeq +} else { + Nil +} - override def outputOrdering: Seq[SortOrder] = child.outputOrdering +// Try to keep child plan's original data parallelism or not. It is enabled by default. +val respectChildParallelism = sqlContext.conf.enableParallelGlobalLimit + +val sumOfOutput = numberOfOutput.sum +if (sumOfOutput <= limit) { + childRDD --- End diff -- i think here we should use the shuffle rdd to directly read the data from disk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16677#discussion_r97700568 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -230,6 +230,21 @@ case object SinglePartition extends Partitioning { } /** + * Represents a partitioning where rows are only serialized/deserialized locally. The number + * of partitions are not changed and also the distribution of rows. This is mainly used to + * obtain some statistics of map tasks such as number of outputs. + */ +case class FakePartitioning(orgPartition: Partitioning, numPartitions: Int) extends Partitioning { --- End diff -- how about `LocalPartitioning` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 @viirya i suggest fix the 2 in this pr, let's wait some comment on 1. /cc @rxin and @wzhfy who may comment on the first case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 For 1, my idea is not use the proposal in this PR, 1. how you determine `total rows in all partitions are (much) more than limit number.` and then go into this code path and how to decide the `much more than`, i can not use cbo estimate stats here because the locallimit plan maybe complex and we can not ensure the accuracy of the estimate row number. 2 as @rxin suggest, this break the rdd chain So for 1, i think it need some improvement of spark core and scheduler as i mentioned above For 2 it is ok to me, the solution is the same with i described above(still shuffle +shuffle to multi partition + modified mapoutput statistics), right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 all partitions after local limit are about/nearly 100,000,000 rows --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 Again, to clean, I am against the performance regression in flowing case 0. limit num is 100,000,000 1. the original table rows is very big, much larger than 100,000,000 rows 2. after local limit stage, the output row num is about/nearly 100,000,000 rows --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 I think shuffle is ok, but shuffle to one partition leads to the performance issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 Assume local limit output 100,000,000 rows, then in global limit it will be take in a single partition, so it is very slow and can not use other free cores to improve the parallelism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 @viirya my team member post the mail list, actually we mean the case i listed above, the main issue is the single partition issue in global limit, if in that case you fall back to old global limit it is still unresolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 I think the local limit cost is important, we assume recompute partions number: m, all the partitions: n m = 1, n =100 is a positive case, but there also cases that m very close to n(even m = n). Our customers has this scenario, so i am so care about this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 Your proposal avoid the cost of all partitions compute and shuffle for local limit but introduce some partitions recompute for local limit stage. We can not decide which cost is cheaper(in most cases), note computation logical for local limit stage maybe very complex and costly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 I think before compare our proposals , we should first make sure our proposal will not bring performance regression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 Not get you, but let me explain more, If we use map output statistics to decide each global limit should take how many element. 1. local limit shuffle with the maillist partitioner and return the map output statistics 2. global limit each partition take or drop some rows(just like what you do in this pr) based on the statistics Then, 1. the shuffle cost is almost the same as now 2. global limit without single partition issue when a big limit number --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 need define a new map output statistics to do this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 Yes, you are right, we can not ensure the uniform distribution for global limit. An idea is not use a special partitioner, after the shuffle we should get the mapoutput statistics for row num of each bucket, and decide each global limit should take how many element. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 refer to the maillist >One issue left is how to decide shuffle partition number. We can have a config of the maximum number of elements for each GlobalLimit task to process, then do a factorization to get a number most close to that config. E.g. the config is 2000: if limit=1, 1 = 2000 * 5, we shuffle to 5 partitions if limit=, = * 9, we shuffle to 9 partitions if limit is a prime number, we just fall back to single partition You mean for the prime number case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 To clear, now we have these issues: 1. local limit compute all partitions, that means it launch many tasks but actually maybe very small tasks is enough. 2. global limit single partition issue, now the global limit will shuffle all the data to one partition, so if the limit num is very big, it cause performance bottleneck It is perfect if we combine the global limit and local limit into one stage, and avoid the shuffle, but for now i can not find a very good solution(no performance regression) to do this without change spark core/scheduler, your solution is trying to do that, but as i suggest, there are some cases the performance maybe worse. @wzhfy 's idea is just resolve the single partition issue, still shuffle, still local limit on all the partitions, but it not bring performance down in that cases compare with current code path. > Another issue is, how do you make sure you create a uniform distribution of the result of local limit. Each local limit can produce different number of rows. it use a special partitioner to do this, the partitioner like the `row_numer` in sql it give each row a uniform partitionid, so in the reduce task, each task handle num of rows very closely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96784321 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- can you explain more, i think without the job of ``` val res = sc.runJob(childRDD, (it: Iterator[InternalRow]) => Array[Int](it.size), p) ``` the job chain is not broken. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16633: [SPARK-19274][SQL] Make GlobalLimit without shuffling da...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/16633 @viirya @rxin i support the idea of @wzhfy in the maillist http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-td20570.html, it solved the single partition issue in the global limit without break the job chain. For local limit it still compute the all partitions, i think we can consider resolve the local limit issue with some changes in core scheduler in future, we may provide a mechanism: do not compute all the tasks in a stage if some condition is satisfied for the stage. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96782626 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- It will be consumed by the parent plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96782094 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- In that case there is no rows to drop, so it is a full scan, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96781278 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- In that case, fist scan: ``` val res = sc.runJob(childRDD, (it: Iterator[InternalRow]) => Array[Int](it.size), p) ``` second scan: ``` mapPartitionsWithIndexInternal { case (index, iter) => if (index < partsScanned) { ... ``` correct me if i am wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96780810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) --- End diff -- Yes, i know, but for the selected partitions in each iteration, it will trigger the compute for these partitions, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96780571 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- Actually we have this scenario, i don't think the case of output number after a filter is less than the limit num is a corner case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96779648 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) --- End diff -- @viirya `it.size` will trigger the scan for the big table, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96773557 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- An example: select xxx from table where xxx > 99 limit 1000 if the table is a big table and real num of xxx which > 99 is less than 100, you still need compute the all the partitions and you will do the filter, scan the big table twice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96773174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- Previously: compute all partitions + shuffle all partitions of child rdd Now: compute some partitions of child rdd twice, one to get the `partsScanned ` and totalNum, one to get the limit num elements from these partitions. I mean second one is not always better than the first if the recompute cost is high. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16633: [SPARK-19274][SQL] Make GlobalLimit without shuff...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/16633#discussion_r96673145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -90,21 +94,74 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } /** - * Take the first `limit` elements of each child partition, but do not collect or shuffle them. + * Take the first `limit` elements of the child's partitions. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output -/** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + protected override def doExecute(): RDD[InternalRow] = { +// This logic is mainly copyed from `SparkPlan.executeTake`. +// TODO: combine this with `SparkPlan.executeTake`, if possible. +val childRDD = child.execute() +val totalParts = childRDD.partitions.length +var partsScanned = 0 +var totalNum = 0 +var resultRDD: RDD[InternalRow] = null +while (totalNum < limit && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1L + if (partsScanned > 0) { +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) +if (totalNum == 0) { + numPartsToTry = partsScanned * limitScaleUpFactor +} else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * limit * partsScanned / totalNum).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) +} + } - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val sc = sqlContext.sparkContext + val res = sc.runJob(childRDD, +(it: Iterator[InternalRow]) => Array[Int](it.size), p) + + totalNum += res.map(_.head).sum + partsScanned += p.size + + if (totalNum >= limit) { +// If we scan more rows than the limit number, we need to reduce that from scanned. +// We calculate how many rows need to be reduced for each partition, +// until all redunant rows are reduced. +var numToReduce = (totalNum - limit) +val reduceAmounts = new HashMap[Int, Int]() +val partitionsToReduce = p.zip(res.map(_.head)).foreach { case (part, size) => + val toReduce = if (size > numToReduce) numToReduce else size + reduceAmounts += ((part, toReduce)) + numToReduce -= toReduce +} +resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) => + if (index < partsScanned) { --- End diff -- Actually for the partitions which `index < partsScanned` of `childRdd` we computed twice, its hard to say this must be better than the old version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15240 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15240 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15240 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [SPARK-9862]Handling data skew
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15297 @YuhuWang2002 We should limit the use case for outer join: For left outer join, such as A left join B, this implementation now can not handle the case of skew of table B. That's because the result of join depends on the all data of the same reduce data of B, you can not split it to multi-tasks. Similarly, for right outer join, such as A right join B, this implementation now can not handle the case of skew of table A. And for full outer join, we can not use the optimization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 `CoarseGrainedSchedulerBackend.removeExecutor` also use ask, but it does not matter right? because it just send msg once and log the error if failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 Updated, can you review again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 ok, i will revert to the initial commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15481 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15481: [SPARK-17929] [CORE] Fix deadlock when CoarseGrai...
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/15481 [SPARK-17929] [CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-17929 Now `CoarseGrainedSchedulerBackend` reset will get the lock, ``` protected def reset(): Unit = synchronized { numPendingExecutors = 0 executorsPendingToRemove.clear() // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } ``` but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock. ``` private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } ... ## How was this patch tested? manual test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KirinKing/spark spark-17929 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15481 commit 3681fae6b5364a5cf55700e1510473d8d9b77cd3 Author: w00228970 Date: 2016-10-14T09:24:30Z use send --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15297 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15297: [WIP][SPARK-9862]Handling data skew
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15297 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15240: [SPARK-17556] [CORE] [SQL] Executor side broadcast for b...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15240 /cc @rxin can you help review this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 @kayousterhout Thanks for your comment, i have updated based on all your comment. . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Do not add failedStages when...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/15213#discussion_r80865465 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1256,11 +1257,13 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) +abortedStage = true } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { abortStage(failedStage, s"$failedStage (${failedStage.name}) " + s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) +abortedStage = true } else if (failedStages.isEmpty) { --- End diff -- it make sense to me, updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 @markhamstra in my fix i just want to make the minor changes for the dagscheduer, and your fix is also ok to me, i can update this according your comment. Thanks:) /cc @zsxwing may also have comments on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15240: [SPARK-17556] Executor side broadcast for broadca...
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/15240 [SPARK-17556] Executor side broadcast for broadcast joins ## What changes were proposed in this pull request? Design doc : https://issues.apache.org/jira/secure/attachment/12830286/executor%20broadcast.pdf ## How was this patch tested? added unit test and manual test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KirinKing/spark SPARK-17556 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15240.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15240 commit df723bb138f4c01b507aed7f3d0a80613b2b2f30 Author: w00228970 Date: 2016-09-26T04:10:11Z draft commit cf4adbe8754cf56dceb2c2ad750a24520ef49d4a Author: w00228970 Date: 2016-09-26T04:19:19Z ununsed changes commit 77057366e69cb659626b4039d35c87fe91a6d668 Author: w00228970 Date: 2016-09-26T04:59:41Z comment improvement commit 81da4e33314015eb044c54e4ef7e834519c15f12 Author: w00228970 Date: 2016-09-26T06:06:44Z fix tests commit 63a5dbf5123b458a52a22e1e03aea152995e55cb Author: w00228970 Date: 2016-09-26T06:37:55Z fix npe commit c284c2cfbf3c3094db8b95ecbae331f8586128a2 Author: w00228970 Date: 2016-09-26T07:33:00Z new endpoint for broadcast commit cc535ed775efcd228c22c3b1dbbc7e10201f3da6 Author: w00228970 Date: 2016-09-26T07:43:14Z unused changes commit 2caafbf8eea238791335ff4a30c1d797a400b540 Author: w00228970 Date: 2016-09-26T10:45:36Z use broadcast in sql commit 89336e7f0cf535484798536c27e3d98d5ff0a358 Author: w00228970 Date: 2016-09-26T11:09:43Z key not found issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 > actual problem is not in abortStage but rather in improper additions to failedStages correct, i think a more accurate description for this issue is "do not add `failedStages` when abortStage for fetch failure" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 Actually the failedStages only added here in spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/15213 Thanks @zsxwing to explain this. @markhamstra the issue happens in the case of my PR description. It usually depends on muti-thread submitting jobs cases and the order of fetch failure, so i said it is a race condition. If you think it is confusing, how about change the title to " do not add failedStages when abort stage"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Fix the race condition when ...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/15213#discussion_r80274817 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2105,6 +2109,54 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } + test("The failed stage never resubmitted due to abort stage in another thread") { +implicit val executorContext = ExecutionContext + .fromExecutorService(Executors.newFixedThreadPool(5)) +val duration = 60.seconds + +val f1 = Future { + try { +val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() +val shuffleHandle = + rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle +rdd1.map { x => --- End diff -- ok, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15213: [SPARK-17644] [CORE] Fix the race condition when ...
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/15213 [SPARK-17644] [CORE] Fix the race condition when DAGScheduler handle the FetchFailed event ## What changes were proposed in this pull request? | Time|Thread 1 , Job1 | Thread 2 , Job2 | |:-:|:-:|:-:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage || | 3 | | task failed due to FetchFailed | then job2 of thread2 never resubmit the failed stage and hang. we should not add failedStages when abort stage. ## How was this patch tested? add unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/KirinKing/spark dag-resubmit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15213 commit 8e667f532fa4509386ff6a6173b75a8e24cab40a Author: w00228970 Date: 2016-09-23T07:17:46Z test case commit 2bfa05b172c68d6aa52e66359e83e2e7c6033662 Author: w00228970 Date: 2016-09-23T07:30:15Z The failed stage never resubmitted --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14712: [SPARK-17072] [SQL] support table-level statistic...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/14712#discussion_r77014850 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -168,6 +169,107 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false) } + private def checkMetastoreRelationStats( + tableName: String, + expectedTotalSize: Long, + expectedRowCount: Option[BigInt]): Unit = { +val df = sql(s"SELECT * FROM $tableName") +val relations = df.queryExecution.analyzed.collect { case rel: MetastoreRelation => + assert(rel.statistics.sizeInBytes === expectedTotalSize) + assert(rel.statistics.rowCount === expectedRowCount) + rel +} +assert(relations.size === 1) + } + + test("test table-level statistics for hive tables created in HiveExternalCatalog") { +val textTable = "textTable" +withTable(textTable) { + sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") + checkMetastoreRelationStats(textTable, +expectedTotalSize = spark.sessionState.conf.defaultSizeInBytes, expectedRowCount = None) + + sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") + // don't have our statistics, MetastoreRelation uses hive's `totalSize` + checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, expectedRowCount = None) + + // noscan won't count the number of rows + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") + checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, expectedRowCount = None) + + // without noscan, we count the number of rows + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") + checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, expectedRowCount = Some(500)) +} + } + + test("test whether the old stats are removed") { +val textTable = "textTable" +withTable(textTable) { + sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE") + sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS") + checkMetastoreRelationStats(textTable, expectedTotalSize = 5812, expectedRowCount = Some(500)) + + sql(s"INSERT INTO TABLE $textTable SELECT * FROM src") + sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan") + // update total size and remove the old and invalid row count + checkMetastoreRelationStats(textTable, expectedTotalSize = 11624, expectedRowCount = None) +} + } + + private def checkLogicalRelationStats( + tableName: String, + expectedRowCount: Option[BigInt]): Unit = { +val df = sql(s"SELECT * FROM $tableName") +val relations = df.queryExecution.analyzed.collect { case rel: LogicalRelation => + // TODO: We don't have an expected value here because parquet size is different on Windows + // and Linux, we need to find the reason and fix this. + assert(rel.statistics.sizeInBytes === rel.relation.sizeInBytes) + assert(rel.statistics.rowCount === expectedRowCount) + rel +} +assert(relations.size === 1) + } + + test("test statistics of LogicalRelation inherited from MetastoreRelation") { +val parquetTable = "parquetTable" +val orcTable = "orcTable" +withTable(parquetTable, orcTable) { + sql(s"CREATE TABLE $parquetTable (key STRING, value STRING) STORED AS PARQUET") + sql(s"CREATE TABLE $orcTable (key STRING, value STRING) STORED AS ORC") + sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src") + sql(s"INSERT INTO TABLE $orcTable SELECT * FROM src") + sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS") + + checkLogicalRelationStats(parquetTable, expectedRowCount = Some(500)) --- End diff -- for parquet table we'd better also set the `spark.sql.hive.convertMetastoreParquet` like orc table below, this is to avoid failure because of other test case changing this config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enab
[GitHub] spark issue #14712: [SPARK-17072] [SQL] support table-level statistics gener...
Github user scwf commented on the issue: https://github.com/apache/spark/pull/14712 /cc @cloud-fan @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-173435459 @yhuai thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-173241662 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/7336#discussion_r50269169 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala --- @@ -198,33 +241,99 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker) } - override def getLocalFileWriter(row: InternalRow, schema: StructType) -: FileSinkOperator.RecordWriter = { -def convertToHiveRawString(col: String, value: Any): String = { - val raw = String.valueOf(value) - schema(col).dataType match { -case DateType => DateTimeUtils.dateToString(raw.toInt) -case _: DecimalType => BigDecimal(raw).toString() -case _ => raw - } + // this function is executed on executor side + override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { +val serializer = newSerializer(fileSinkConf.getTableInfo) +val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( +fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, +ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + +val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray +val dataTypes = inputSchema.map(_.dataType) +val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) } +val outputData = new Array[Any](fieldOIs.length) --- End diff -- yes, extracted a common method for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/5827#issuecomment-172151467 Actually we were trying to contribute this improvements, unfortunately the community do not want them for maintain(or compatibility with hive ql) reason in the past:). I am glad that spark sql use a single parser such that people can make contributions and make it more and more powerful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-172144917 Ping @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/5827#issuecomment-172144781 @rxin Our parser is a extended version of the `SqlParser`, the main difference is that we add the support for subquery(both correlated and uncorrelated ),exists, in and some minor improvement such as grouping, top, cube/rollup. It support the tpcds generated ANSI sql syntax without any change. I noticed that there are some PRs for these features, i will take a look at that PRs when i have time and see what i can do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/5827#issuecomment-171667688 @rxin, yes we used this and we implements a new sqlparser based on this interface to support ANSI tpcds sql. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12742] [SQL] org.apache.spark.sql.hive....
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/10682#discussion_r49404076 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala --- @@ -24,6 +24,9 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ protected override def beforeAll(): Unit = { +sql("DROP TABLE IF EXISTS t0") +sql("DROP TABLE IF EXISTS t1") +sql("DROP TABLE IF EXISTS t2") --- End diff -- @liancheng t1 table must already be created in other test suite, so here i just drop them before run this suite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-170412209 @rxin, yes, This PR try to fix the same issue on the Hive support side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12742] [SQL] org.apache.spark.sql.hive....
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/10682 [SPARK-12742] [SQL] org.apache.spark.sql.hive.LogicalPlanToSQLSuite failure due to Table already exists exception ``` [info] Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.LogicalPlanToSQLSuite *** ABORTED *** (325 milliseconds) [info] org.apache.spark.sql.AnalysisException: Table `t1` already exists.; [info] at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:296) [info] at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:285) [info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:33) [info] at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187) [info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.beforeAll(LogicalPlanToSQLSuite.scala:23) [info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253) [info] at org.apache.spark.sql.hive.LogicalPlanToSQLSuite.run(LogicalPlanToSQLSuite.scala:23) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:462) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:671) [info] at sbt.ForkMain$Run$2.call(ForkMain.java:296) [info] at sbt.ForkMain$Run$2.call(ForkMain.java:286) [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [info] at java.lang.Thread.run(Thread.java:745) ``` /cc @liancheng You can merge this pull request into a Git repository by running: $ git pull https://github.com/scwf/spark fix-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/10682.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10682 commit 62dd2f4e46c9fff010fd231fdc2e8c5c1a63c86b Author: wangfei Date: 2016-01-10T16:30:58Z fix table already exists error --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-170359121 Back to update, @marmbrus @rxin please help review this when you have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6190][core] create LargeByteBuffer for ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/5400#issuecomment-170037893 >>The cached size cannot be greater than 2GB. @rxin how to understand the `cached size`? the partition size of a cached rdd? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6190][core] create LargeByteBuffer for ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/5400#issuecomment-169517123 hi @squito, can you explain in which situation users will hit the 2g limit? will a job of processing very large data(such as PB level data) reach this limit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12321][SQL] JSON format for TreeNode (u...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/10311#issuecomment-166772051 Get it thanks @marmbrus :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12321][SQL] JSON format for TreeNode (u...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/10311#issuecomment-166611132 Hi @cloud-fan can you explain in which cases we can use this feature or the motivation for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/10253#issuecomment-163590407 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/10213#issuecomment-163089761 /cc @davies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12222] [Core] Deserialize RoaringBitmap...
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/10213 [SPARK-1] [Core] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception: ``` com.esotericsoftware.kryo.KryoException: Buffer underflow. at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) at org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes ``` This is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`. Instead of upgrade kryo's version, this pr bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method. more detail link to https://github.com/apache/spark/pull/9748#issuecomment-162860246 You can merge this pull request into a Git repository by running: $ git pull https://github.com/scwf/spark patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/10213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10213 commit edf438443b35e131490e8652d6382143180b9e26 Author: Fei Wang Date: 2015-12-09T02:41:42Z fix skipBytes commit 01d37b0e1833dd17aedd27427102edef3ffdba79 Author: Fei Wang Date: 2015-12-09T02:45:19Z added test suite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11016] Move RoaringBitmap to explicit K...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/9748#issuecomment-163074120 ok, should i send pr to master and branch-1.6 both? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11016] Move RoaringBitmap to explicit K...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/9748#issuecomment-162860246 @davies here are some problems when deserialize for RoaringBitmap. see the examples below: run this piece of code ``` import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import java.io.DataInput class KryoInputDataInputBridge(input: KryoInput) extends DataInput { override def readLong(): Long = input.readLong() override def readChar(): Char = input.readChar() override def readFloat(): Float = input.readFloat() override def readByte(): Byte = input.readByte() override def readShort(): Short = input.readShort() override def readUTF(): String = input.readString() // readString in kryo does utf8 override def readInt(): Int = input.readInt() override def readUnsignedShort(): Int = input.readShortUnsigned() override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt override def readFully(b: Array[Byte]): Unit = input.read(b) override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) override def readLine(): String = throw new UnsupportedOperationException("readLine") override def readBoolean(): Boolean = input.readBoolean() override def readUnsignedByte(): Int = input.readByteUnsigned() override def readDouble(): Double = input.readDouble() } class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput { override def writeFloat(v: Float): Unit = output.writeFloat(v) // There is no "readChars" counterpart, except maybe "readLine", which is not supported override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") override def writeDouble(v: Double): Unit = output.writeDouble(v) override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8 override def writeShort(v: Int): Unit = output.writeShort(v) override def writeInt(v: Int): Unit = output.writeInt(v) override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) override def write(b: Int): Unit = output.write(b) override def write(b: Array[Byte]): Unit = output.write(b) override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len) override def writeBytes(s: String): Unit = output.writeString(s) override def writeChar(v: Int): Unit = output.writeChar(v.toChar) override def writeLong(v: Long): Unit = output.writeLong(v) override def writeByte(v: Int): Unit = output.writeByte(v) } val outStream = new FileOutputStream("D:\\wfserde") val output = new KryoOutput(outStream) val bitmap = new RoaringBitmap bitmap.add(1) bitmap.add(3) bitmap.add(5) bitmap.serialize(new KryoOutputDataOutputBridge(output)) output.flush() output.close() val inStream = new FileInputStream("D:\\wfserde") val input = new KryoInput(inStream) val ret = new RoaringBitmap ret.deserialize(new KryoInputDataInputBridge(input)) println(ret) ``` this will throw `Buffer underflow` error: ``` com.esotericsoftware.kryo.KryoException: Buffer underflow. at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) at org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes ``` after same investigation, i found this is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`. So i think we can fix this issue in this two ways: 1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in kryo (i am not sure the upgrade is safe in spark, can you check it? @davies ) 2) we can bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method: ``` class KryoInputDataInputBridge(input: KryoInput) extends DataInput { ... override def skipBytes(n: Int): Int = { var remaining: Long = n while (remaining > 0) { val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
[GitHub] spark pull request: [SPARK-11253][SQL] reset all accumulators in p...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/9215#issuecomment-151041137 should this merged to branch-1.5? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9281] [SQL] use decimal or double when ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7642#issuecomment-147906001 hi @davies seems this is not compatible with hiveql, HiveQl still parse float number as double. https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala#L1670-L1689 Or i missed sth? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4226][SQL]Add subquery (not) in/exists ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/9055#issuecomment-147273499 ok, does this support multi exists and in in where clause? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4226][SQL]Add subquery (not) in/exists ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/9055#issuecomment-147272550 what's the difference with #4812? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Add toString to DataFrame/Column
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/4436#discussion_r40289134 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] { def childrenResolved = !children.exists(!_.resolved) /** + * Returns a string representation of this expression that does not have developer centric + * debugging information like the expression id. + */ + def prettyString: String = { +transform { --- End diff -- no, i am going through the code, and i think we'd better not use transform here. in which case we need change the expression? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] Add toString to DataFrame/Column
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/4436#discussion_r40283225 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] { def childrenResolved = !children.exists(!_.resolved) /** + * Returns a string representation of this expression that does not have developer centric + * debugging information like the expression id. + */ + def prettyString: String = { +transform { --- End diff -- why use transform here? this will change the expression --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-141853176 @litao-buptsse, i will update this soon thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7417#issuecomment-138544481 @Sephiroth-Lin can you rebase this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9066][SQL] Improve cartesian performanc...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7417#issuecomment-138037270 @zsxwing it is definitely putting the small table in the left side of 'RDD.cartesian` improve the performance. you can have a simple test that do cartesian with a big data set and a small data set. the performance of putting the small data set right is much much worse. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8813][SQL] Combine files when there're ...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/8125#issuecomment-131669839 @liancheng we have this cases: The production system produce small text/csv files every five minute, and we use spark sql to do some ETL work(such as agg) on this small files to produce the agg parquet/orc files to for analysis. In this case this small files only go through once, so we do not want to do compacting periodically here and hope that spark sql can merge the small files when do the ETL work. We have test this case with this patch, and performance improved about 20%. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-130316086 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-130312583 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-130185516 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-130153626 Retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-130152470 yes, since we upgrade the hive version to 1.2.1, we should adapt the token tree in hiveql, the old one is not correct in 1.2.1. Updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-130127052 /cc @marmbrus can you take a look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8968] [SQL] external sort by the partit...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7336#issuecomment-129266361 /cc @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7190] [SPARK-8804] [SPARK-7815] [SQL] u...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7197#issuecomment-128555369 @davies https://issues.apache.org/jira/browse/SPARK-9725 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7190] [SPARK-8804] [SPARK-7815] [SQL] u...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/7197#issuecomment-128352195 @davies here is a bug when this PR is in, that is when set executor memory >= 32g, all the queries for string field will have problem. seems it return empty/garbled string to user. I am not familiar with the unsafe utf8string and not fund the root cause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-125853720 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-125565512 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4131][SQL] support writing data into th...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4380#issuecomment-124285788 /cc @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org