[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21560


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198571824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

Yeah, it could be 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198571496
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

(same comment as above applies here - we don't have partitioning 
information in analysis)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198380164
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
 ---
@@ -50,6 +51,42 @@ class ContinuousAggregationSuite extends 
ContinuousSuiteBase {
 }
   }
 
+  test("multiple partitions with coalesce") {
+val input = ContinuousMemoryStream[Int]
+
+val df = input.toDF().coalesce(1).agg(max('value))
+
+testStream(df, OutputMode.Complete)(
+  AddData(input, 0, 1, 2),
+  CheckAnswer(2),
+  StopStream,
+  AddData(input, 3, 4, 5),
+  StartStream(),
+  CheckAnswer(5),
+  AddData(input, -1, -2, -3),
+  CheckAnswer(5))
+  }
+
+  test("multiple partitions with coalesce - multiple transformations") {
+val input = ContinuousMemoryStream[Int]
+
+val df = input.toDF()
+  .coalesce(1)
+  .select('value as 'copy, 'value)
+  .where('copy =!= 2)
+  .agg(max('value))
--- End diff --

You missed this comment. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198337615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

We list the partitions when computing the coalesce RDD. Should we instead 
be packing the partitions into the partitions of the coalesce RDD? I'd assumed 
it was valid to expect that rdd.partitions would work on executors, but maybe 
it's not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198222671
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

Agree. Also the 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198053471
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.UUID
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] =
+(0 until numPartitions).map(ContinuousCoalesceRDDPartition).toArray
+
+  // When we support more than 1 target partition, we'll need to figure 
out how to pass in the
+  // required partitioner.
+  private val outputPartitioner = new HashPartitioner(1)
+
+  private val readerEndpointNames = (0 until numPartitions).map { i =>
+s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}"
+  }
+
+  val readerRDD = new ContinuousShuffleReadRDD(
--- End diff --

private


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198052028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

Wait. I dont see the `readerFactories` object to be used anywhere other 
than in getPartitions, where they are saved as part of 
`ContinuousDataSourceRDDPartition` objects. And RDD.compute() seems to picking 
it up from `ContinuousDataSourceRDDPartition` objects, and not from 
`readerFactories`. So I dont think `readerFactories` needs to be serialized.

At the very least, rename `readerFactories` to `readerInputPartitions` for 
consistency. 
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198052377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.UUID
+
+import org.apache.spark.{HashPartitioner, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition,
 ContinuousShuffleReadRDD}
+
+/**
+ * Physical plan for coalescing a continuous processing plan.
+ *
+ * Currently, only coalesces to a single partition are supported. 
`numPartitions` must be 1.
+ */
+case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) 
extends SparkPlan {
+  override def output: Seq[Attribute] = child.output
+
+  override def children: Seq[SparkPlan] = child :: Nil
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override def doExecute(): RDD[InternalRow] = {
+assert(numPartitions == 1)
+
+val childRdd = child.execute()
--- End diff --

nit: Dont need this variable. And merge remove excess empty lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198055297
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.UUID
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] =
+(0 until numPartitions).map(ContinuousCoalesceRDDPartition).toArray
+
+  // When we support more than 1 target partition, we'll need to figure 
out how to pass in the
+  // required partitioner.
+  private val outputPartitioner = new HashPartitioner(1)
+
+  private val readerEndpointNames = (0 until numPartitions).map { i =>
+s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}"
+  }
+
+  val readerRDD = new ContinuousShuffleReadRDD(
--- End diff --

Also, honestly, you dont need the RDD here. You only need the shuffle 
reading code, which is the `ContinuousShuffleReader` and endpoint. So you can 
just instantiate that in the compute function. Its very confusing to an RDD 
inside another RDD which is not hooked to the dependency chain. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198055537
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

There is a queue inside 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198056760
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
 ---
@@ -50,6 +51,42 @@ class ContinuousAggregationSuite extends 
ContinuousSuiteBase {
 }
   }
 
+  test("multiple partitions with coalesce") {
+val input = ContinuousMemoryStream[Int]
+
+val df = input.toDF().coalesce(1).agg(max('value))
+
+testStream(df, OutputMode.Complete)(
+  AddData(input, 0, 1, 2),
+  CheckAnswer(2),
+  StopStream,
+  AddData(input, 3, 4, 5),
+  StartStream(),
+  CheckAnswer(5),
+  AddData(input, -1, -2, -3),
+  CheckAnswer(5))
+  }
+
+  test("multiple partitions with coalesce - multiple transformations") {
+val input = ContinuousMemoryStream[Int]
+
+val df = input.toDF()
+  .coalesce(1)
+  .select('value as 'copy, 'value)
+  .where('copy =!= 2)
+  .agg(max('value))
--- End diff --

test transformations both before and after coalesce. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198050994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
   override def getPreferredLocations(split: Partition): Seq[String] = {
 
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
+
+  override def clearDependencies(): Unit = {
+throw new IllegalStateException("Continuous RDDs cannot be 
checkpointed")
--- End diff --

@HeartSaVioR No this method no intended for being called in normal 
circumstance. And less of a reason to call this in an internally generated RDD. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197985638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

If its the same 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197936350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

Oh wait, I see what you mean. Repartition(5, ...) would never be matched by 
this rule, since it only applies to Aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197935989
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

I don't think there's any particular reason we need to. There's no reason 
we couldn't execute multiple repartitions if the optimizer isn't smart enough 
to combine them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197933535
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
--- End diff --

This is just a default argument to make tests less wordy. I can remove it 
if you think that's best, but it doesn't impose a restriction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197933175
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

I've made some changes to try to restrict the assumption that the number of 
partitions is 1 to two places:

* ContinuousCoalesceExec
* The output partitioner in ContinuousCoalesceRDD, since it's not obvious 
to me what the right strategy to get this would be in the general case. If you 
have ideas I'm open to removing this too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197930245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
--- End diff --

Repartition would normally imply distributed execution, which isn't 
happening here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197929943
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

No, they'll be in 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197929262
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

I agree that it wouldn't be needed, but partitioning information is not 
always available during analysis. So I don't think we can write the more 
granular check suggested here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197928934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
   override def getPreferredLocations(split: Partition): Seq[String] = {
 
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
+
+  override def clearDependencies(): Unit = {
+throw new IllegalStateException("Continuous RDDs cannot be 
checkpointed")
--- End diff --

I don't know, I'm unfamiliar with this method. @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197526872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

Agree. And since theres an `assert (numpartitions == 1)` in 
`ContinuousCoalesceExec`, we can probably create any array of `numPartitions` 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197564080
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- End diff --

The 

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197561226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
--- End diff --

Maybe I am missing. Is this more like a re-partition (just shuffles) than 
coalesce?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197523482
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

Also if theres a single parent partition and theres a `Repartition(1)` that 
node should probably be removed. Not sure if this is already being done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197520939
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

What if there was only a single partition to begin with ? Then theres no 
need of Repartition(1) and this check should be skipped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197004935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
   override def getPreferredLocations(split: Partition): Seq[String] = {
 
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
+
+  override def clearDependencies(): Unit = {
+throw new IllegalStateException("Continuous RDDs cannot be 
checkpointed")
--- End diff --

I'm wondering the method can be called in normal situation: when continuous 
query is terminated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197000483
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
--- End diff --

What if we have multiple repartitions which one meets the case and others 
are not? I'm not sure we are restricting repartition operations to be only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999896
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
   extends RDD[UnsafeRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
 (0 until numPartitions).map { partIndex =>
-  ContinuousShuffleReadPartition(partIndex, queueSize, 
numShuffleWriters, epochIntervalMs)
+  ContinuousShuffleReadPartition(
+partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, 
epochIntervalMs)
--- End diff --

This effectively asserting numPartitions to be 1, otherwise it will throw 
exception.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999687
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
 numPartitions: Int,
 queueSize: Int = 1024,
 numShuffleWriters: Int = 1,
-epochIntervalMs: Long = 1000)
+epochIntervalMs: Long = 1000,
+val endpointNames: Seq[String] = 
Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
--- End diff --

Same here: if possible it might be better to have complete code rather than 
just working with such assumption.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196999745
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

We are addressing only the specific case that number of partitions is 1, 
but we could have some assertion for that and try to write complete code so 
that we don't modify it again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196930368
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

We need to be able to generate the full list of partitions from within a 
single task in order for coalesce to work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196924994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threads.foreach(_.interrupt())
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+  threads.foreach(_.start())
+}
+
+reader.compute(reader.partitions(split.index), context)
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+Seq(new NarrowDependency(prev) {
+  def getParents(id: Int): Seq[Int] = Seq(0)
--- End diff --

Yeah, I confused myself when looking at the normal coalesce RDD. The 
default dependency handling is correct here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-20 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196921230
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -350,7 +350,14 @@ object UnsupportedOperationChecker {
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
 case node =>
-  throwError(s"Continuous processing does not support 
${node.nodeName} operations.")
+  val aboveSinglePartitionCoalesce = node.find {
--- End diff --

It will allow the first one, and I've added a test to verify.

It ought to allow the second one, but for some reason streaming deduplicate 
insists on inserting a shuffle above the coalesce(1). I will address this in a 
separate PR, since this seems like suboptimal behavior that isn't only 
restricted to continuous processing. For now I tweaked the condition to only 
allow aggregates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196586217
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
--- End diff --

maybe use a thread pool (using `org...spark.util.ThreadUtils`) with a name 
to track threads. Then the cached threads in threadpool can be reused across 
epochs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196238635
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -21,22 +21,25 @@ import java.util.UUID
 
 import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.NextIterator
 
 case class ContinuousShuffleReadPartition(
   index: Int,
+  endpointName: String,
   queueSize: Int,
   numShuffleWriters: Int,
   epochIntervalMs: Long)
 extends Partition {
+
--- End diff --

Unnecessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196607009
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
 sc: SparkContext,
 dataQueueSize: Int,
 epochPollIntervalMs: Long,
-@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
+private val readerFactories: Seq[InputPartition[UnsafeRow]])
--- End diff --

since all the partitions do no need all the factories, the right thing to 
do is to put partition's factory in the partition object. This is so that the 
all factories are not serialized for all tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196582424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -350,7 +350,14 @@ object UnsupportedOperationChecker {
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
 case node =>
-  throwError(s"Continuous processing does not support 
${node.nodeName} operations.")
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
+throwError(s"Continuous processing does not support 
${node.nodeName} operations.")
--- End diff --

It would be nice if this error statement says what is supported. That you 
can rewrite the query with colesce(1)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196586798
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threads.foreach(_.interrupt())
--- End diff --

clean up using threadpool


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196606185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threads.foreach(_.interrupt())
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+  threads.foreach(_.start())
+}
+
+reader.compute(reader.partitions(split.index), context)
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+Seq(new NarrowDependency(prev) {
+  def getParents(id: Int): Seq[Int] = Seq(0)
+})
+  }
+
+  override def clearDependencies() {
+super.clearDependencies()
--- End diff --

As commented above, this should actually throw exception so that this is 
never checkpointed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196584760
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.UUID
+
+import org.apache.spark.{HashPartitioner, SparkEnv}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition,
 ContinuousShuffleReadRDD}
+
+/**
+ * Physical plan for coalescing a continuous processing plan.
+ *
+ * Currently, only coalesces to a single partition are supported. 
`numPartitions` must be 1.
+ */
+case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) 
extends SparkPlan {
+  override def output: Seq[Attribute] = child.output
+
+  override def children: Seq[SparkPlan] = child :: Nil
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override def doExecute(): RDD[InternalRow] = {
+assert(numPartitions == 1)
+
+val childRdd = child.execute()
+val endpointName = s"RPCContinuousShuffleReader-${UUID.randomUUID()}"
+val reader = new ContinuousShuffleReadRDD(
--- End diff --

super nit: rename to readerRDD to avoid confusion with other v2 reader 
classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196580603
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -350,7 +350,14 @@ object UnsupportedOperationChecker {
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
 case node =>
-  throwError(s"Continuous processing does not support 
${node.nodeName} operations.")
+  val aboveSinglePartitionCoalesce = node.find {
--- End diff --

Will this allow `kafkaStreamDF.coalesc(1).select(...).filter(...).agg(...)`?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196589311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
--- End diff --

They are to make it this RDD checkpointable by make this clearable. This 
raises a good point, I dont think we should make this checkpointable. Rather I 
suggest this, make these simple vals (well, just remove modifier), and in 
clearDependencies, just throw an error saying "Checkpoint this RDD is not 
supported".


We should do this for all the continuous shuffle RDDs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196609584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
+  extends RDD[InternalRow](reader.context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
reader.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = reader.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val threads = prev.partitions.map { prevSplit =>
+new Thread() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threads.foreach(_.interrupt())
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+  threads.foreach(_.start())
+}
+
+reader.compute(reader.partitions(split.index), context)
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+Seq(new NarrowDependency(prev) {
+  def getParents(id: Int): Seq[Int] = Seq(0)
--- End diff --

Should 1 partition of this class depend on all parant RDD partitions, and 
not just the 0. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196589618
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
--- End diff --

Add docs on what this means. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r196581248
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -350,7 +350,14 @@ object UnsupportedOperationChecker {
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
 case node =>
-  throwError(s"Continuous processing does not support 
${node.nodeName} operations.")
+  val aboveSinglePartitionCoalesce = node.find {
--- End diff --

And are we allowing all stateful operations after this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r195420149
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -36,6 +37,17 @@ object DataSourceV2Strategy extends Strategy {
 case WriteToContinuousDataSource(writer, query) =>
   WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
 
+case Repartition(1, false, child) =>
+  val isContinuous = child.collectFirst {
+case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) 
=> r
+  }.isDefined
--- End diff --

The judgement of whether the plan is continuous or not can be a sperated 
method and other place can use it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-14 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r195415777
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var 
prev: RDD[InternalRow])
--- End diff --

why the `reader` and `prev` both is var here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-13 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/21560

[SPARK-24386][SS] coalesce(1) aggregates in continuous processing

## What changes were proposed in this pull request?

Provide a continuous processing implementation of coalesce(1), as well as 
allowing aggregates on top of it.

The changes in ContinuousQueuedDataReader and such are to use split.index 
(the ID of the partition within the RDD currently being compute()d) rather than 
context.partitionId() (the partition ID of the scheduled task within the Spark 
job - that is, the post coalesce writer). In the absence of a narrow 
dependency, these values were previously always the same, so there was no need 
to distinguish.

## How was this patch tested?

new unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark coalesce

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21560


commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38
Author: Jose Torres 
Date:   2018-05-15T18:07:54Z

continuous shuffle read RDD

commit b5d100875932bdfcb645c8f6b2cdb7b815d84c80
Author: Jose Torres 
Date:   2018-05-17T03:11:11Z

docs

commit af407694a5f13c18568da4a63848f82374a44377
Author: Jose Torres 
Date:   2018-05-17T03:19:37Z

Merge remote-tracking branch 'apache/master' into readerRddMaster

commit 46456dc75a6aec9659b18523c421999debd060eb
Author: Jose Torres 
Date:   2018-05-17T03:22:49Z

fix ctor

commit 2ea8a6f94216e8b184e5780ec3e6ffb2838de382
Author: Jose Torres 
Date:   2018-05-17T03:43:10Z

multiple partition test

commit 955ac79eb05dc389e632d1aaa6c59396835c6ed5
Author: Jose Torres 
Date:   2018-05-17T13:33:51Z

unset task context after test

commit 8cefb724512b51f2aa1fdd81fa8a2d4560e60ce3
Author: Jose Torres 
Date:   2018-05-18T00:00:05Z

conf from RDD

commit f91bfe7e3fc174202d7d5c7cde5a8fb7ce86bfd3
Author: Jose Torres 
Date:   2018-05-18T00:00:44Z

endpoint name

commit 259029298fc42a65e8ebb4d2effe49b7fafa96f1
Author: Jose Torres 
Date:   2018-05-18T00:02:08Z

testing bool

commit 859e6e4dd4dd90ffd70fc9cbd243c94090d72506
Author: Jose Torres 
Date:   2018-05-18T00:22:10Z

tests

commit b23b7bb17abe3cbc873a3144c56d08c88bc0c963
Author: Jose Torres 
Date:   2018-05-18T00:40:55Z

take instead of poll

commit 97f7e8ff865e6054d0d70914ce9bb51880b161f6
Author: Jose Torres 
Date:   2018-05-18T00:58:44Z

add interface

commit de21b1c25a333d44c0521fe151b468e51f0bdc47
Author: Jose Torres 
Date:   2018-05-18T01:02:37Z

clarify comment

commit 7dcf51a13e92a0bb2998e2a12e67d351e1c1a4fc
Author: Jose Torres 
Date:   2018-05-18T22:39:28Z

multiple

commit ad0b5aab320413891f7c21ea6115b6da8d49ccf9
Author: Jose Torres 
Date:   2018-05-25T00:06:15Z

writer with 1 reader partition

commit c9adee5423c2e8a030911008d2e6942045d484bb
Author: Jose Torres 
Date:   2018-05-25T00:15:39Z

docs and iface

commit 63d38d849107eed226449cec8d24c2241cd583c9
Author: Jose Torres 
Date:   2018-05-25T00:27:26Z

Merge remote-tracking branch 'apache/master' into writerTask

commit 331f437423262a1aa76754a8079d7c017e4ea28a
Author: Jose Torres 
Date:   2018-05-25T00:37:14Z

increment epoch

commit f3ce67529372f72370a1e6028dc71a751acf26f2
Author: Jose Torres 
Date:   2018-05-25T00:40:39Z

undo oop

commit e0108d7bc164b9e5eeb757c13c80bc1d11671188
Author: Jose Torres 
Date:   2018-05-25T00:54:01Z

make rdd loop

commit 024f92d6bd471e207e1625dc6cdca31e1067deb8
Author: Jose Torres 
Date:   2018-05-25T22:56:59Z

basic

commit 8f1939b91dbef76879d5e5f2077dea35e5343e89
Author: Jose Torres 
Date:   2018-06-11T21:48:21Z

coalesce working

commit c99d9524d4778b973df34378e98d53a152e0a42c
Author: Jose Torres 
Date:   2018-06-13T21:34:38Z

Merge remote-tracking branch 'apache/master' into coalesce

commit aaac0af0ddebffe64338a69a5a16dcfab9432a51
Author: Jose Torres 
Date:   2018-06-13T22:04:19Z

fix merge

commit 80d60db4c99e52e624dcbd19cc7c5ba519ff4e1c
Author: Jose Torres 
Date:   2018-06-13T23:09:29Z

rm spurious diffs

commit 26b74f016033f582a61694133b82df6a40295c0b
Author: Jose Torres 
Date:   2018-06-14T04:43:00Z

unsupported check

commit 03cc20d73dd547e476fad90d47225ef9e96a8cbc
Author: Jose Torres 
Date:   2018-06-14T04:54:30Z

change back timeout




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org