[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21337 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189410027 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + // In this unit test, we emulate that we're in the task thread where + // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context + // thread local to be set. + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) --- End diff -- simply `def send(end: ThreadsafeRPCEndpoint, messages: UnsafeRowReceiverMessage*) { ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189409872 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + // In this unit test, we emulate that we're in the task thread where + // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context + // thread local to be set. + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) --- End diff -- Would be super NICE if there was a function that allowed this to be `send(endpoint, ReceiverRow(unsafeRow(111)), ReceiverEpochMarker(), ReceiverRow(unsafeRow(222)), ReceiverRow(unsafeRow(333)), ...) ` ---
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189409608 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int, queueSize: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (reader: ContinuousShuffleReader, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(queueSize, env) +val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID().toString}", receiver) --- End diff -- nit: dont need`toString` when its interpolated in the string. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189409482 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + // In this unit test, we emulate that we're in the task thread where + // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context + // thread local to be set. + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111)) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333)) + } + +
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189409455 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + // In this unit test, we emulate that we're in the task thread where + // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context + // thread local to be set. + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111)) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333)) + } + +
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189409378 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + // In this unit test, we emulate that we're in the task thread where + // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context + // thread local to be set. + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader +eventually(timeout(streamingTimeout)) { + assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333)) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111)) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333)) + } + +
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189133964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle + * writers will send rows here, with continuous shuffle readers polling for new rows as needed. + * + * TODO: Support multiple source tasks. We need to output a single epoch marker once all + * source tasks have sent one. + */ +private[shuffle] class UnsafeRowReceiver( + queueSize: Int, + override val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + // Note that this queue will be drained from the main task thread and populated in the RPC + // response thread. + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + var stopped = new AtomicBoolean(false) --- End diff -- Restricted visibility and made it a proper val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189127139 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle + * writers will send rows here, with continuous shuffle readers polling for new rows as needed. + * + * TODO: Support multiple source tasks. We need to output a single epoch marker once all + * source tasks have sent one. + */ +private[shuffle] class UnsafeRowReceiver( + queueSize: Int, + override val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + // Note that this queue will be drained from the main task thread and populated in the RPC + // response thread. + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize) + var stopped = new AtomicBoolean(false) --- End diff -- why is this a var? and public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189125545 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env) --- End diff -- It might be more easier to understand the information flow if the queue size value is directly passed through the RDD as opposed to magically getting it through `SQLConf.get` (hard to debug issues like why is my conf from my session not being used). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189129070 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) --- End diff -- would be better for debugging if the endpoint name had a prefix like "UnsafeRowReceiver" that distinguishes it from other endpoints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189129893 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.next().getInt(0) == 111) +assert(iter.next().getInt(0) == 222) +assert(iter.next().getInt(0) == 333) +assert(!iter.hasNext) --- End diff -- can be compressed to `assert(iter.toSeq == Seq(111, 222, 333))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r189130600 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +TaskContext.unset() +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.next().getInt(0) == 111) +assert(iter.next().getInt(0) == 222) +assert(iter.next().getInt(0) == 333) +assert(!iter.hasNext) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.next().getInt(0) == 111) +assert(!firstEpoch.hasNext) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.next().getInt(0) == 222) +assert(secondEpoch.next().getInt(0) == 333) +assert(!secondEpoch.hasNext) + } + + test("empty epochs") { +val rdd = new
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188831887 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.next().getInt(0) == 111) +assert(iter.next().getInt(0) == 222) +assert(iter.next().getInt(0) == 333) +assert(!iter.hasNext) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.next().getInt(0) == 111) +assert(!firstEpoch.hasNext) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.next().getInt(0) == 222) +assert(secondEpoch.next().getInt(0) == 333) +assert(!secondEpoch.hasNext) + } + + test("empty epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext,
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188829894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- The way I see it is that the current implementation only supports one UnsafeRow source, and the followup PR will extend it to multiple. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188829815 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- Clarified and completed the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188829780 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. + */ +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024) --- End diff -- There's actually an existing config I forgot to pull in. (I think it makes sense to use the same config for all continuous read buffers unless and until we see a need to split it.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188682238 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- If the task does not wait for markers from all its children, it would not guarantee at-least once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188638980 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.next().getInt(0) == 111) +assert(iter.next().getInt(0) == 222) +assert(iter.next().getInt(0) == 333) +assert(!iter.hasNext) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.next().getInt(0) == 111) +assert(!firstEpoch.hasNext) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.next().getInt(0) == 222) +assert(secondEpoch.next().getInt(0) == 333) +assert(!secondEpoch.hasNext) + } + + test("empty epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext,
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188632188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- Does this ensure at-least-once? Then we could start from this, and improve it from another PR as @jose-torres stated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188628202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- If my understanding is right, bottom will be the RDD which will be just injected before shuffling, so that would be neither reader nor writer. `first` and `last` would be good alternative for me if bottom looks like ambiguous. As @arunmahadevan stated, comment looks like incomplete. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188636306 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. + */ +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024) --- End diff -- I guess we can handle 2 as TODO if we would like to focus on proposed patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188604001 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- "Bottom is a bit ambiguous" +1 for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188601016 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. + */ +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv) --- End diff -- override val rpcEnv here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188456856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- It should, but I think that's significant enough to justify its own PR. Added an explicit TODO to be safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188456692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- Well, ContinuousShuffleReadRDD is a bit self-documenting as a reader. Added that it's receiving shuffle data from upstream tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188392937 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- incomplete comment? Bottom is a bit ambiguous. Can we explicitly state whats at the bottom (reader or writer) and if this receiving the shuffle data from downstream ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188397958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. + */ +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024) --- End diff -- 1. may be good to state the assumption that the queue will be continuously drained irrespective of the markers etc. 2. can the queue size be made configurable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188397203 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. --- End diff -- may be good to add who the senders and receivers are to get an idea of where this fits in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188396282 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- shouldn't this wait for epoch markers from all child tasks ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/21337 [SPARK-24234][SS] Reader for continuous processing shuffle ## What changes were proposed in this pull request? Read RDD for continuous processing shuffle, as well as the initial RPC-based row receiver. https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark readerRddMaster Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21337.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21337 commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38 Author: Jose TorresDate: 2018-05-15T18:07:54Z continuous shuffle read RDD --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org