[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21337


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189410027
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  // In this unit test, we emulate that we're in the task thread where
+  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
+  // thread local to be set.
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
--- End diff --

simply
`def send(end: ThreadsafeRPCEndpoint, messages: UnsafeRowReceiverMessage*) 
{ ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189409872
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  // In this unit test, we emulate that we're in the task thread where
+  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
+  // thread local to be set.
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
--- End diff --

Would be super NICE if there was a function that allowed this to be 
`send(endpoint, ReceiverRow(unsafeRow(111)), ReceiverEpochMarker(), 
ReceiverRow(unsafeRow(222)), ReceiverRow(unsafeRow(333)), ...) `


---


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189409608
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int, queueSize: Int) 
extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (reader: ContinuousShuffleReader, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(queueSize, env)
+val endpoint = 
env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID().toString}", receiver)
--- End diff --

nit: dont need`toString` when its interpolated in the string.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189409482
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  // In this unit test, we emulate that we're in the task thread where
+  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
+  // thread local to be set.
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
+  }
+
+  

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189409455
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  // In this unit test, we emulate that we're in the task thread where
+  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
+  // thread local to be set.
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
+  }
+
+  

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-18 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189409378
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  // In this unit test, we emulate that we're in the task thread where
+  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
+  // thread local to be set.
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
+  }
+
+  

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189133964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task. Continuous shuffle
+ * writers will send rows here, with continuous shuffle readers polling 
for new rows as needed.
+ *
+ * TODO: Support multiple source tasks. We need to output a single epoch 
marker once all
+ * source tasks have sent one.
+ */
+private[shuffle] class UnsafeRowReceiver(
+  queueSize: Int,
+  override val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  // Note that this queue will be drained from the main task thread and 
populated in the RPC
+  // response thread.
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  var stopped = new AtomicBoolean(false)
--- End diff --

Restricted visibility and made it a proper val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189127139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task. Continuous shuffle
+ * writers will send rows here, with continuous shuffle readers polling 
for new rows as needed.
+ *
+ * TODO: Support multiple source tasks. We need to output a single epoch 
marker once all
+ * source tasks have sent one.
+ */
+private[shuffle] class UnsafeRowReceiver(
+  queueSize: Int,
+  override val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  // Note that this queue will be drained from the main task thread and 
populated in the RPC
+  // response thread.
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
+  var stopped = new AtomicBoolean(false)
--- End diff --

why is this a var? and public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189125545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new 
UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env)
--- End diff --

It might be more easier to understand the information flow if the queue 
size value is directly passed through the RDD as opposed to magically getting 
it through `SQLConf.get` (hard to debug issues like why is my conf from my 
session not being used).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189129070
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new 
UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
--- End diff --

would be better for debugging if the endpoint name had a prefix like 
"UnsafeRowReceiver" that distinguishes it from other endpoints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189129893
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
--- End diff --

can be compressed to `assert(iter.toSeq == Seq(111, 222, 333))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r189130600
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+TaskContext.unset()
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.next().getInt(0) == 111)
+assert(!firstEpoch.hasNext)
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.next().getInt(0) == 222)
+assert(secondEpoch.next().getInt(0) == 333)
+assert(!secondEpoch.hasNext)
+  }
+
+  test("empty epochs") {
+val rdd = new 

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188831887
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.next().getInt(0) == 111)
+assert(!firstEpoch.hasNext)
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.next().getInt(0) == 222)
+assert(secondEpoch.next().getInt(0) == 333)
+assert(!secondEpoch.hasNext)
+  }
+
+  test("empty epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, 

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188829894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

The way I see it is that the current implementation only supports one 
UnsafeRow source, and the followup PR will extend it to multiple.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188829815
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

Clarified and completed the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188829780
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
--- End diff --

There's actually an existing config I forgot to pull in. (I think it makes 
sense to use the same config for all continuous read buffers unless and until 
we see a need to split it.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188682238
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

If the task does not wait for markers from all its children, it would not 
guarantee at-least once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188638980
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.next().getInt(0) == 111)
+assert(!firstEpoch.hasNext)
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.next().getInt(0) == 222)
+assert(secondEpoch.next().getInt(0) == 333)
+assert(!secondEpoch.hasNext)
+  }
+
+  test("empty epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, 

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188632188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

Does this ensure at-least-once? Then we could start from this, and improve 
it from another PR as @jose-torres stated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188628202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

If my understanding is right, bottom will be the RDD which will be just 
injected before shuffling, so that would be neither reader nor writer.

`first` and `last` would be good alternative for me if bottom looks like 
ambiguous. 

As @arunmahadevan stated, comment looks like incomplete.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188636306
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
--- End diff --

I guess we can handle 2 as TODO if we would like to focus on proposed patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188604001
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

"Bottom is a bit ambiguous" +1 for this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188601016
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
--- End diff --

override val rpcEnv here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188456856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

It should, but I think that's significant enough to justify its own PR. 
Added an explicit TODO to be safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188456692
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

Well, ContinuousShuffleReadRDD is a bit self-documenting as a reader. Added 
that it's receiving shuffle data from upstream tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188392937
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

incomplete comment?

Bottom is a bit ambiguous. Can we explicitly state whats at the bottom 
(reader or writer) and if this receiving the shuffle data from downstream ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188397958
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
--- End diff --

1. may be good to state the assumption that the queue will be continuously 
drained irrespective of the markers etc.
2. can the queue size be made configurable?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188397203
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
--- End diff --

may be good to add who the senders and receivers are to get an idea of 
where this fits in. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188396282
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

shouldn't this wait for epoch markers from all child tasks ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-15 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/21337

[SPARK-24234][SS] Reader for continuous processing shuffle

## What changes were proposed in this pull request?

Read RDD for continuous processing shuffle, as well as the initial 
RPC-based row receiver.


https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii

## How was this patch tested?

new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark readerRddMaster

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21337


commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38
Author: Jose Torres 
Date:   2018-05-15T18:07:54Z

continuous shuffle read RDD




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org