[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21560 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198571824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- Yeah, it could be ma
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198571496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- (same comment as above applies here - we don't have partitioning information in analysis) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198380164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala --- @@ -50,6 +51,42 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } } + test("multiple partitions with coalesce") { +val input = ContinuousMemoryStream[Int] + +val df = input.toDF().coalesce(1).agg(max('value)) + +testStream(df, OutputMode.Complete)( + AddData(input, 0, 1, 2), + CheckAnswer(2), + StopStream, + AddData(input, 3, 4, 5), + StartStream(), + CheckAnswer(5), + AddData(input, -1, -2, -3), + CheckAnswer(5)) + } + + test("multiple partitions with coalesce - multiple transformations") { +val input = ContinuousMemoryStream[Int] + +val df = input.toDF() + .coalesce(1) + .select('value as 'copy, 'value) + .where('copy =!= 2) + .agg(max('value)) --- End diff -- You missed this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198337615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- We list the partitions when computing the coalesce RDD. Should we instead be packing the partitions into the partitions of the coalesce RDD? I'd assumed it was valid to expect that rdd.partitions would work on executors, but maybe it's not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198222671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- Agree. Also the 2*
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198053471 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.UUID + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = +(0 until numPartitions).map(ContinuousCoalesceRDDPartition).toArray + + // When we support more than 1 target partition, we'll need to figure out how to pass in the + // required partitioner. + private val outputPartitioner = new HashPartitioner(1) + + private val readerEndpointNames = (0 until numPartitions).map { i => +s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}" + } + + val readerRDD = new ContinuousShuffleReadRDD( --- End diff -- private --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198052028 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- Wait. I dont see the `readerFactories` object to be used anywhere other than in getPartitions, where they are saved as part of `ContinuousDataSourceRDDPartition` objects. And RDD.compute() seems to picking it up from `ContinuousDataSourceRDDPartition` objects, and not from `readerFactories`. So I dont think `readerFactories` needs to be serialized. At the very least, rename `readerFactories` to `readerInputPartitions` for consistency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198052377 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.UUID + +import org.apache.spark.{HashPartitioner, SparkEnv} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD} + +/** + * Physical plan for coalescing a continuous processing plan. + * + * Currently, only coalesces to a single partition are supported. `numPartitions` must be 1. + */ +case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan { + override def output: Seq[Attribute] = child.output + + override def children: Seq[SparkPlan] = child :: Nil + + override def outputPartitioning: Partitioning = SinglePartition + + override def doExecute(): RDD[InternalRow] = { +assert(numPartitions == 1) + +val childRdd = child.execute() --- End diff -- nit: Dont need this variable. And merge remove excess empty lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198055297 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.UUID + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = +(0 until numPartitions).map(ContinuousCoalesceRDDPartition).toArray + + // When we support more than 1 target partition, we'll need to figure out how to pass in the + // required partitioner. + private val outputPartitioner = new HashPartitioner(1) + + private val readerEndpointNames = (0 until numPartitions).map { i => +s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}" + } + + val readerRDD = new ContinuousShuffleReadRDD( --- End diff -- Also, honestly, you dont need the RDD here. You only need the shuffle reading code, which is the `ContinuousShuffleReader` and endpoint. So you can just instantiate that in the compute function. Its very confusing to an RDD inside another RDD which is not hooked to the dependency chain. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198056760 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala --- @@ -50,6 +51,42 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase { } } + test("multiple partitions with coalesce") { +val input = ContinuousMemoryStream[Int] + +val df = input.toDF().coalesce(1).agg(max('value)) + +testStream(df, OutputMode.Complete)( + AddData(input, 0, 1, 2), + CheckAnswer(2), + StopStream, + AddData(input, 3, 4, 5), + StartStream(), + CheckAnswer(5), + AddData(input, -1, -2, -3), + CheckAnswer(5)) + } + + test("multiple partitions with coalesce - multiple transformations") { +val input = ContinuousMemoryStream[Int] + +val df = input.toDF() + .coalesce(1) + .select('value as 'copy, 'value) + .where('copy =!= 2) + .agg(max('value)) --- End diff -- test transformations both before and after coalesce. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198055537 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- There is a queue inside the
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198050994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( override def getPreferredLocations(split: Partition): Seq[String] = { split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() } + + override def clearDependencies(): Unit = { +throw new IllegalStateException("Continuous RDDs cannot be checkpointed") --- End diff -- @HeartSaVioR No this method no intended for being called in normal circumstance. And less of a reason to call this in an internally generated RDD. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197985638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- If its the same ta
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197936350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- Oh wait, I see what you mean. Repartition(5, ...) would never be matched by this rule, since it only applies to Aggregate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197935989 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- I don't think there's any particular reason we need to. There's no reason we couldn't execute multiple repartitions if the optimizer isn't smart enough to combine them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197933535 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) --- End diff -- This is just a default argument to make tests less wordy. I can remove it if you think that's best, but it doesn't impose a restriction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197933175 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- I've made some changes to try to restrict the assumption that the number of partitions is 1 to two places: * ContinuousCoalesceExec * The output partitioner in ContinuousCoalesceRDD, since it's not obvious to me what the right strategy to get this would be in the general case. If you have ideas I'm open to removing this too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197930245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) --- End diff -- Repartition would normally imply distributed execution, which isn't happening here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197929943 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- No, they'll be in th
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197929262 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- I agree that it wouldn't be needed, but partitioning information is not always available during analysis. So I don't think we can write the more granular check suggested here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197928934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( override def getPreferredLocations(split: Partition): Seq[String] = { split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() } + + override def clearDependencies(): Unit = { +throw new IllegalStateException("Continuous RDDs cannot be checkpointed") --- End diff -- I don't know, I'm unfamiliar with this method. @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197526872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- Agree. And since theres an `assert (numpartitions == 1)` in `ContinuousCoalesceExec`, we can probably create any array of `numPartitions` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197523482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- Also if theres a single parent partition and theres a `Repartition(1)` that node should probably be removed. Not sure if this is already being done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197564080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- End diff -- The `writer.write`
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197561226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) --- End diff -- Maybe I am missing. Is this more like a re-partition (just shuffles) than coalesce? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197520939 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- What if there was only a single partition to begin with ? Then theres no need of Repartition(1) and this check should be skipped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197004935 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( override def getPreferredLocations(split: Partition): Seq[String] = { split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() } + + override def clearDependencies(): Unit = { +throw new IllegalStateException("Continuous RDDs cannot be checkpointed") --- End diff -- I'm wondering the method can be called in normal situation: when continuous query is terminated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197000483 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true --- End diff -- What if we have multiple repartitions which one meets the case and others are not? I'm not sure we are restricting repartition operations to be only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999896 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) extends RDD[UnsafeRow](sc, Nil) { override protected def getPartitions: Array[Partition] = { (0 until numPartitions).map { partIndex => - ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs) + ContinuousShuffleReadPartition( +partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs) --- End diff -- This effectively asserting numPartitions to be 1, otherwise it will throw exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999687 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( numPartitions: Int, queueSize: Int = 1024, numShuffleWriters: Int = 1, -epochIntervalMs: Long = 1000) +epochIntervalMs: Long = 1000, +val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) --- End diff -- Same here: if possible it might be better to have complete code rather than just working with such assumption. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196999745 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- We are addressing only the specific case that number of partitions is 1, but we could have some assertion for that and try to write complete code so that we don't modify it again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196930368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- We need to be able to generate the full list of partitions from within a single task in order for coalesce to work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196924994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threads.foreach(_.interrupt()) + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + threads.foreach(_.start()) +} + +reader.compute(reader.partitions(split.index), context) + } + + override def getDependencies: Seq[Dependency[_]] = { +Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = Seq(0) --- End diff -- Yeah, I confused myself when looking at the normal coalesce RDD. The default dependency handling is correct here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196921230 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -350,7 +350,14 @@ object UnsupportedOperationChecker { _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => case node => - throwError(s"Continuous processing does not support ${node.nodeName} operations.") + val aboveSinglePartitionCoalesce = node.find { --- End diff -- It will allow the first one, and I've added a test to verify. It ought to allow the second one, but for some reason streaming deduplicate insists on inserting a shuffle above the coalesce(1). I will address this in a separate PR, since this seems like suboptimal behavior that isn't only restricted to continuous processing. For now I tweaked the condition to only allow aggregates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196586217 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { --- End diff -- maybe use a thread pool (using `org...spark.util.ThreadUtils`) with a name to track threads. Then the cached threads in threadpool can be reused across epochs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196238635 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -21,22 +21,25 @@ import java.util.UUID import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcAddress import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.NextIterator case class ContinuousShuffleReadPartition( index: Int, + endpointName: String, queueSize: Int, numShuffleWriters: Int, epochIntervalMs: Long) extends Partition { + --- End diff -- Unnecessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196607009 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( sc: SparkContext, dataQueueSize: Int, epochPollIntervalMs: Long, -@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) +private val readerFactories: Seq[InputPartition[UnsafeRow]]) --- End diff -- since all the partitions do no need all the factories, the right thing to do is to put partition's factory in the partition object. This is so that the all factories are not serialized for all tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196582424 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -350,7 +350,14 @@ object UnsupportedOperationChecker { _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => case node => - throwError(s"Continuous processing does not support ${node.nodeName} operations.") + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { +throwError(s"Continuous processing does not support ${node.nodeName} operations.") --- End diff -- It would be nice if this error statement says what is supported. That you can rewrite the query with colesce(1) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196586798 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threads.foreach(_.interrupt()) --- End diff -- clean up using threadpool --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196606185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threads.foreach(_.interrupt()) + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + threads.foreach(_.start()) +} + +reader.compute(reader.partitions(split.index), context) + } + + override def getDependencies: Seq[Dependency[_]] = { +Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = Seq(0) +}) + } + + override def clearDependencies() { +super.clearDependencies() --- End diff -- As commented above, this should actually throw exception so that this is never checkpointed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196584760 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.UUID + +import org.apache.spark.{HashPartitioner, SparkEnv} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD} + +/** + * Physical plan for coalescing a continuous processing plan. + * + * Currently, only coalesces to a single partition are supported. `numPartitions` must be 1. + */ +case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan { + override def output: Seq[Attribute] = child.output + + override def children: Seq[SparkPlan] = child :: Nil + + override def outputPartitioning: Partitioning = SinglePartition + + override def doExecute(): RDD[InternalRow] = { +assert(numPartitions == 1) + +val childRdd = child.execute() +val endpointName = s"RPCContinuousShuffleReader-${UUID.randomUUID()}" +val reader = new ContinuousShuffleReadRDD( --- End diff -- super nit: rename to readerRDD to avoid confusion with other v2 reader classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196580603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -350,7 +350,14 @@ object UnsupportedOperationChecker { _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => case node => - throwError(s"Continuous processing does not support ${node.nodeName} operations.") + val aboveSinglePartitionCoalesce = node.find { --- End diff -- Will this allow `kafkaStreamDF.coalesc(1).select(...).filter(...).agg(...)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196589311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) --- End diff -- They are to make it this RDD checkpointable by make this clearable. This raises a good point, I dont think we should make this checkpointable. Rather I suggest this, make these simple vals (well, just remove modifier), and in clearDependencies, just throw an error saying "Checkpoint this RDD is not supported". We should do this for all the continuous shuffle RDDs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196609584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) + extends RDD[InternalRow](reader.context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = reader.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val threads = prev.partitions.map { prevSplit => +new Thread() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threads.foreach(_.interrupt()) + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + threads.foreach(_.start()) +} + +reader.compute(reader.partitions(split.index), context) + } + + override def getDependencies: Seq[Dependency[_]] = { +Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = Seq(0) --- End diff -- Should 1 partition of this class depend on all parant RDD partitions, and not just the 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196589618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false --- End diff -- Add docs on what this means. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r196581248 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -350,7 +350,14 @@ object UnsupportedOperationChecker { _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => case node => - throwError(s"Continuous processing does not support ${node.nodeName} operations.") + val aboveSinglePartitionCoalesce = node.find { --- End diff -- And are we allowing all stateful operations after this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r195420149 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -36,6 +37,17 @@ object DataSourceV2Strategy extends Strategy { case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil +case Repartition(1, false, child) => + val isContinuous = child.collectFirst { +case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r + }.isDefined --- End diff -- The judgement of whether the plan is continuous or not can be a sperated method and other place can use it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r195415777 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) --- End diff -- why the `reader` and `prev` both is var here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/21560 [SPARK-24386][SS] coalesce(1) aggregates in continuous processing ## What changes were proposed in this pull request? Provide a continuous processing implementation of coalesce(1), as well as allowing aggregates on top of it. The changes in ContinuousQueuedDataReader and such are to use split.index (the ID of the partition within the RDD currently being compute()d) rather than context.partitionId() (the partition ID of the scheduled task within the Spark job - that is, the post coalesce writer). In the absence of a narrow dependency, these values were previously always the same, so there was no need to distinguish. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark coalesce Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21560.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21560 commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38 Author: Jose Torres Date: 2018-05-15T18:07:54Z continuous shuffle read RDD commit b5d100875932bdfcb645c8f6b2cdb7b815d84c80 Author: Jose Torres Date: 2018-05-17T03:11:11Z docs commit af407694a5f13c18568da4a63848f82374a44377 Author: Jose Torres Date: 2018-05-17T03:19:37Z Merge remote-tracking branch 'apache/master' into readerRddMaster commit 46456dc75a6aec9659b18523c421999debd060eb Author: Jose Torres Date: 2018-05-17T03:22:49Z fix ctor commit 2ea8a6f94216e8b184e5780ec3e6ffb2838de382 Author: Jose Torres Date: 2018-05-17T03:43:10Z multiple partition test commit 955ac79eb05dc389e632d1aaa6c59396835c6ed5 Author: Jose Torres Date: 2018-05-17T13:33:51Z unset task context after test commit 8cefb724512b51f2aa1fdd81fa8a2d4560e60ce3 Author: Jose Torres Date: 2018-05-18T00:00:05Z conf from RDD commit f91bfe7e3fc174202d7d5c7cde5a8fb7ce86bfd3 Author: Jose Torres Date: 2018-05-18T00:00:44Z endpoint name commit 259029298fc42a65e8ebb4d2effe49b7fafa96f1 Author: Jose Torres Date: 2018-05-18T00:02:08Z testing bool commit 859e6e4dd4dd90ffd70fc9cbd243c94090d72506 Author: Jose Torres Date: 2018-05-18T00:22:10Z tests commit b23b7bb17abe3cbc873a3144c56d08c88bc0c963 Author: Jose Torres Date: 2018-05-18T00:40:55Z take instead of poll commit 97f7e8ff865e6054d0d70914ce9bb51880b161f6 Author: Jose Torres Date: 2018-05-18T00:58:44Z add interface commit de21b1c25a333d44c0521fe151b468e51f0bdc47 Author: Jose Torres Date: 2018-05-18T01:02:37Z clarify comment commit 7dcf51a13e92a0bb2998e2a12e67d351e1c1a4fc Author: Jose Torres Date: 2018-05-18T22:39:28Z multiple commit ad0b5aab320413891f7c21ea6115b6da8d49ccf9 Author: Jose Torres Date: 2018-05-25T00:06:15Z writer with 1 reader partition commit c9adee5423c2e8a030911008d2e6942045d484bb Author: Jose Torres Date: 2018-05-25T00:15:39Z docs and iface commit 63d38d849107eed226449cec8d24c2241cd583c9 Author: Jose Torres Date: 2018-05-25T00:27:26Z Merge remote-tracking branch 'apache/master' into writerTask commit 331f437423262a1aa76754a8079d7c017e4ea28a Author: Jose Torres Date: 2018-05-25T00:37:14Z increment epoch commit f3ce67529372f72370a1e6028dc71a751acf26f2 Author: Jose Torres Date: 2018-05-25T00:40:39Z undo oop commit e0108d7bc164b9e5eeb757c13c80bc1d11671188 Author: Jose Torres Date: 2018-05-25T00:54:01Z make rdd loop commit 024f92d6bd471e207e1625dc6cdca31e1067deb8 Author: Jose Torres Date: 2018-05-25T22:56:59Z basic commit 8f1939b91dbef76879d5e5f2077dea35e5343e89 Author: Jose Torres Date: 2018-06-11T21:48:21Z coalesce working commit c99d9524d4778b973df34378e98d53a152e0a42c Author: Jose Torres Date: 2018-06-13T21:34:38Z Merge remote-tracking branch 'apache/master' into coalesce commit aaac0af0ddebffe64338a69a5a16dcfab9432a51 Author: Jose Torres Date: 2018-06-13T22:04:19Z fix merge commit 80d60db4c99e52e624dcbd19cc7c5ba519ff4e1c Author: Jose Torres Date: 2018-06-13T23:09:29Z rm spurious diffs commit 26b74f016033f582a61694133b82df6a40295c0b Author: Jose Torres Date: 2018-06-14T04:43:00Z unsupported check commit 03cc20d73dd547e476fad90d47225ef9e96a8cbc Author: Jose Torres Date: 2018-06-14T04:54:30Z change back timeout --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org