[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r180933380 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest { // A continuous trigger that will only fire the initial time for the duration of a test. // This allows clean testing with manual epoch advancement. protected val longContinuousTrigger = Trigger.Continuous("1 hour") + + override protected implicit val defaultTrigger = Trigger.Continuous(100) + override protected val defaultUseV2Sink = true } class ContinuousSuite extends ContinuousSuiteBase { import testImplicits._ - test("basic rate source") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) + test("basic") { +val input = MemoryStream[Int] --- End diff -- I think that's too much to hope for right now. We can do that later. For now, let's make everything explicitly ContinuousMemoryStream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21048 [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface ## What changes were proposed in this pull request? Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly. 1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames. 1. HDFSBackedStateStore (aka in-memory state store) - Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation. - Writing a snapshot file - Same as above. Current problems: 1. State Store behavior is incorrect - 1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized. Solution: 1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically. 2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism. This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS. The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error. ## How was this patch tested? New tests in `CheckpointFileManagerSuite` and slightly modified existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23966 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21048.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21048 commit df7b339d73097b8501fe0937f770b8b2ded1b63e Author: Tathagata Das <tathagata.das1565@...> Date: 2018-04-11T04:21:14Z CheckpointFileManager --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20983: [SPARK-23747][Structured Streaming] Add EpochCoor...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20983#discussion_r180553748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import org.mockito.InOrder +import org.mockito.Matchers.{any, eq => eqTo} +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.test.SharedSparkSession + +class EpochCoordinatorSuite + extends SparkFunSuite +with SharedSparkSession +with MockitoSugar +with BeforeAndAfterEach { + + private var epochCoordinator: RpcEndpointRef = _ + + private var writer: StreamWriter = _ + private var query: ContinuousExecution = _ + private var orderVerifier: InOrder = _ + + override def beforeEach(): Unit = { +val reader = mock[ContinuousReader] +writer = mock[StreamWriter] +query = mock[ContinuousExecution] +orderVerifier = inOrder(writer, query) + +epochCoordinator + = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, SparkEnv.get) + } + + override def afterEach(): Unit = { +SparkEnv.get.rpcEnv.stop(epochCoordinator) + } + + test("single epoch") { +setWriterPartitions(3) +setReaderPartitions(2) + +commitPartitionEpoch(0, 1) +commitPartitionEpoch(1, 1) +commitPartitionEpoch(2, 1) +reportPartitionOffset(0, 1) +reportPartitionOffset(1, 1) + +// Here and in subsequent tests this is called to make a synchronous call to EpochCoordinator +// so that mocks would have been acted upon by the time verification happens +makeSynchronousCall() + +verifyCommit(1) + } + + test("single epoch, all but one writer partition has committed") { +setWriterPartitions(3) +setReaderPartitions(2) + +commitPartitionEpoch(0, 1) +commitPartitionEpoch(1, 1) +reportPartitionOffset(0, 1) +reportPartitionOffset(1, 1) + +makeSynchronousCall() + +verifyCommitHasntHappened(1) + } + + test("single epoch, all but one reader partition has reported an offset") { +setWriterPartitions(3) +setReaderPartitions(2) + +commitPartitionEpoch(0, 1) +commitPartitionEpoch(1, 1) +commitPartitionEpoch(2, 1) +reportPartitionOffset(0, 1) + +makeSynchronousCall() + +verifyCommitHasntHappened(1) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179596647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* --- End diff -- Let's make all the memory streams consistently named (like Kafka has KafkaMicroBatchReader and KafkaContinuousReader)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179603916 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) --- End diff -- nit: I generally put such small-use generically-named classes inside a relevant object to avoid cluttering of the general class namespace. Example: this can be inside a object ContinuousMemoryStreamRecordBuffer as that is the only one using it. Then when someone searches for a different GetRecord-ish named class, then it would be less confusing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179594450 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -43,8 +45,39 @@ object MemoryStream { protected val currentBlockId = new AtomicInteger(0) protected val memoryStreamId = new AtomicInteger(0) - def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] = -new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) + def apply[A : Encoder]( + implicit sqlContext: SQLContext, + trigger: Trigger = Trigger.ProcessingTime(0)): MemoryStreamBase[A] = trigger match { +case _: ContinuousTrigger => + new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) +case _ => + new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext) + } +} + +abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends BaseStreamingSource { --- End diff -- Add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179603105 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified --- End diff -- nit: to poll from what? (and also clarify what is at the executor and what is at the driver) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179604298 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { + None +} else { + Some(buf(index)) +} + context.reply(record.map(Row(_))) +} + } +} + +class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) +extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { + private implicit val formats = Serialization.formats(NoTypeHints) + val NUM_PARTITIONS = 2 + + // ContinuousReader implementation + + @GuardedBy("this") + private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A]) + + private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, records) + + def addData(data: TraversableOnce[A]): Offset = synchronized { +// Distribute data evenly among partition lists. +data.toSeq.zipWithIndex.map { + case (item, index) => records(index % NUM_PARTITIONS) += item +} + +// The new target offset is the offset where all records in all partitions have been processed. +ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, records(i).size)).toMap) + } + + private var startOffset: ContinuousMemoryStreamOffset = _ + + override def setStartOffset(start: Optional[Offset]): Unit = synch
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179602477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** --- End diff -- I would keep the main class (the MemoryStreamContinuousReader) at the top and add this comment as part of the Scala docs of that class. And also add docs on each of the classes to explain their purpose. This is taking me a whole lot of scrolling back and forth for me to understand, even if I roughly know what the organization should be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179601495 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* --- End diff -- Also, all the sources are in streaming/sources/ (or should be), not in streaming/continuous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179617487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { + None +} else { + Some(buf(index)) +} + context.reply(record.map(Row(_))) +} + } +} + +class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) +extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { + private implicit val formats = Serialization.formats(NoTypeHints) + val NUM_PARTITIONS = 2 + + // ContinuousReader implementation + + @GuardedBy("this") + private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A]) + + private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, records) + + def addData(data: TraversableOnce[A]): Offset = synchronized { +// Distribute data evenly among partition lists. +data.toSeq.zipWithIndex.map { + case (item, index) => records(index % NUM_PARTITIONS) += item +} + +// The new target offset is the offset where all records in all partitions have been processed. +ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, records(i).size)).toMap) + } + + private var startOffset: ContinuousMemoryStreamOffset = _ + + override def setStartOffset(start: Optional[Offset]): Unit = synch
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179617590 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { + None +} else { + Some(buf(index)) +} + context.reply(record.map(Row(_))) +} + } +} + +class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) +extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { + private implicit val formats = Serialization.formats(NoTypeHints) + val NUM_PARTITIONS = 2 + + // ContinuousReader implementation + + @GuardedBy("this") + private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A]) + + private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, records) + + def addData(data: TraversableOnce[A]): Offset = synchronized { +// Distribute data evenly among partition lists. +data.toSeq.zipWithIndex.map { + case (item, index) => records(index % NUM_PARTITIONS) += item +} + +// The new target offset is the offset where all records in all partitions have been processed. +ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, records(i).size)).toMap) + } + + private var startOffset: ContinuousMemoryStreamOffset = _ + + override def setStartOffset(start: Optional[Offset]): Unit = synch
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179599245 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest { // A continuous trigger that will only fire the initial time for the duration of a test. // This allows clean testing with manual epoch advancement. protected val longContinuousTrigger = Trigger.Continuous("1 hour") + + override protected implicit val defaultTrigger = Trigger.Continuous(100) + override protected val defaultUseV2Sink = true } class ContinuousSuite extends ContinuousSuiteBase { import testImplicits._ - test("basic rate source") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) + test("basic") { +val input = MemoryStream[Int] --- End diff -- Looking at this test code, its very confusing that this is using continuous memory stream, and not ordinary memory stream. The implicit ContinuousTrigger magic is not intuitive. I would rather have an explicitly `ContinuousMemoryStream` rather than `MemoryStream` magically generating two different kinds based on different implicit values. And because of this polymorphism, multiple testsuites that do not have continuous stream tests had to be changed (functions returning MemoryStream had to return MemoryStreamBase). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179617149 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( --- End diff -- This is not really a buffer as it only refers to buffers that are externally managed. Its only serves as an endpoint to fetch stuff from the buffer. I would prefer one way or the other - either all the buffer management and endpoint management should be inside the class (adding and fetching, synchronized by this class) OR this is an inner class of ContinuousMemoryStream whose only purpose is to be endpoint relaying fetch requests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179605302 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { --- End diff -- super nit: this can be in a single line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179618466 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { + None +} else { + Some(buf(index)) +} + context.reply(record.map(Row(_))) +} + } +} + +class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) +extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { + private implicit val formats = Serialization.formats(NoTypeHints) + val NUM_PARTITIONS = 2 + + // ContinuousReader implementation + + @GuardedBy("this") + private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A]) + + private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, records) + + def addData(data: TraversableOnce[A]): Offset = synchronized { +// Distribute data evenly among partition lists. +data.toSeq.zipWithIndex.map { + case (item, index) => records(index % NUM_PARTITIONS) += item +} + +// The new target offset is the offset where all records in all partitions have been processed. +ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, records(i).size)).toMap) + } + + private var startOffset: ContinuousMemoryStreamOffset = _ + + override def setStartOffset(start: Optional[Offset]): Unit = synch
[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20828#discussion_r179606525 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.{util => ju} +import java.util.Optional +import java.util.concurrent.ArrayBlockingQueue +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext} +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.RpcUtils + +/** + * The overall strategy here is: + * * ContinuousMemoryStream maintains a list of records for each partition. addData() will + *distribute records evenly-ish across partitions. + * * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for partition-level + *ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified + *offset within the list, or null if that offset doesn't yet have a record. + */ + +private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset) + +private class ContinuousMemoryStreamRecordBuffer[A]( +stream: ContinuousMemoryStream[A], +partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => stream.synchronized { + val buf = partitionBuffers(part) + + val record = +if (buf.size <= index) { + None +} else { + Some(buf(index)) +} + context.reply(record.map(Row(_))) +} + } +} + +class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) +extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { + private implicit val formats = Serialization.formats(NoTypeHints) + val NUM_PARTITIONS = 2 + + // ContinuousReader implementation + + @GuardedBy("this") + private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A]) + + private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, records) + + def addData(data: TraversableOnce[A]): Offset = synchronized { +// Distribute data evenly among partition lists. +data.toSeq.zipWithIndex.map { + case (item, index) => records(index % NUM_PARTITIONS) += item +} + +// The new target offset is the offset where all records in all partitions have been processed. +ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, records(i).size)).toMap) + } + + private var startOffset: ContinuousMemoryStreamOffset = _ + + override def setStartOffset(start: Optional[Offset]): Unit = synch
spark git commit: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Repository: spark Updated Branches: refs/heads/master 7cf9fab33 -> 66a3a5a2d [SPARK-23099][SS] Migrate foreach sink to DataSourceV2 ## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR #20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose TorresCloses #20951 from jose-torres/foreach. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66a3a5a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66a3a5a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66a3a5a2 Branch: refs/heads/master Commit: 66a3a5a2dc83e03dedcee9839415c1ddc1fb8125 Parents: 7cf9fab Author: Jose Torres Authored: Tue Apr 3 11:05:29 2018 -0700 Committer: Tathagata Das Committed: Tue Apr 3 11:05:29 2018 -0700 -- .../sql/execution/streaming/ForeachSink.scala | 68 - .../sources/ForeachWriterProvider.scala | 111 +++ .../spark/sql/streaming/DataStreamWriter.scala | 4 +- .../execution/streaming/ForeachSinkSuite.scala | 305 -- .../streaming/sources/ForeachWriterSuite.scala | 306 +++ .../sql/streaming/StreamingQuerySuite.scala | 1 + 6 files changed, 420 insertions(+), 375 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/66a3a5a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala deleted file mode 100644 index 2cc5410..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor - -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. -val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) - } -} - } - -
[GitHub] spark issue #20951: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20951 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20951#discussion_r178648761 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -141,7 +141,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.processAllAvailable() } assert(e.getCause.isInstanceOf[SparkException]) - assert(e.getCause.getCause.getMessage === "error") + assert(e.getCause.getCause.getCause.getMessage === "ForeachSinkSuite error") --- End diff -- why 3 levels? Can you paste the levels? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20951#discussion_r178648616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -131,7 +131,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf .foreach(new TestForeachWriter() { --- End diff -- And move this to streaming.sources package similar ConsoleWriterSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20951#discussion_r178648434 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -131,7 +131,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf .foreach(new TestForeachWriter() { --- End diff -- Maybe rename this to `ForeachWriterSuite`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20958: [SPARK-23844][SS] Fix socket source honors recovered off...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20958 We have made it clear that sockets is ONLY for testing and will not recover data from checkpoints. So I see no problem that it throws errors when attempting to recover. May we can improve the error message by making it clear that recovery is not supported. If you indeed want to forget lost data and proceed, then that should be an opt-in. We could do this by explicitly setting a source option (like failOnDataLoss = false in Kafka source). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20958: [SPARK-23844][SS] Fix socket source honors recove...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20958#discussion_r178646725 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -238,6 +238,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { "write files of Hive data source directly.") } +val isSocketExists = df.queryExecution.analyzed.collect { --- End diff -- I see what you are trying to do. But, honestly, we should NOT add any more special cases for specific sources. We already have memory and foreach, because it is hard to get rid of those. We should not add more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20951#discussion_r178645775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,81 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.sql.{Encoder, ForeachWriter, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { --- End diff -- Actually, why not make it extend DataSourceV2 for consistency sake? Then it is easier to find all data sources in code by looking at who extends DataSourceV2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20951#discussion_r178645279 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,81 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} +import org.apache.spark.sql.{Encoder, ForeachWriter, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { --- End diff -- Rename the file accordingly. and Add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23827][SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions
Repository: spark Updated Branches: refs/heads/branch-2.3 3f5955aa0 -> 507cff246 [SPARK-23827][SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #20941 from tdas/SPARK-23827. (cherry picked from commit 15298b99ac8944e781328423289586176cf824d7) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/507cff24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/507cff24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/507cff24 Branch: refs/heads/branch-2.3 Commit: 507cff246cd9e15a418d67b66bf762be4ae71c67 Parents: 3f5955a Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri Mar 30 16:48:26 2018 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Mar 30 16:48:55 2018 -0700 -- .../streaming/IncrementalExecution.scala| 2 +- .../StreamingSymmetricHashJoinExec.scala| 3 +- .../spark/sql/streaming/DeduplicateSuite.scala | 8 +--- .../streaming/FlatMapGroupsWithStateSuite.scala | 5 +- .../sql/streaming/StatefulOperatorTest.scala| 49 .../apache/spark/sql/streaming/StreamTest.scala | 19 .../streaming/StreamingAggregationSuite.scala | 4 +- 7 files changed, 25 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index a10ed5f..1a83c88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -62,7 +62,7 @@ class IncrementalExecution( StreamingDeduplicationStrategy :: Nil } - private val numStateStores = offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key) + private[sql] val numStateStores = offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key) .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter) .getOrElse(sparkSession.sessionState.conf.numShufflePartitions) http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index c351f65..fa7c8ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -167,7 +167,8 @@ case class StreamingSymmetricHashJoinExec( val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) override def requiredChildDistribution: Seq[Distribution] = -ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil +ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) :: + ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil override def output: Seq[Attribute] = joinType match { case _: InnerLike => left.output ++ right.output http://git-wip-us.apache.org/repos/asf/spark/blob/507
[GitHub] spark issue #20941: [SPARK-23827] [SS] StreamingJoinExec should ensure that ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20941 Started more tests to test for flakiness. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20941: [SPARK-23827] [SS] StreamingJoinExec should ensure that ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20941 @brkyvz @zsxwing can one of you take a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20941: Spark 23827
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20941 Spark 23827 ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23827 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20941.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20941 commit 02cc5509455d3f9d6d683a46fe4a50fcde8da348 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-29T02:38:59Z Fixed join issue commit 7046fbd5244e5d3adb75b7d090d57f1adc8b9859 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-29T22:32:55Z Fix compilation commit c162f8def7f7f57b9e8b954a5fe2f96368b5ed2f Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-29T23:22:53Z Removed unnecessary tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23096][SS] Migrate rate source to V2
Repository: spark Updated Branches: refs/heads/master 35997b59f -> c68ec4e6a [SPARK-23096][SS] Migrate rate source to V2 ## What changes were proposed in this pull request? This PR migrate micro batch rate source to V2 API and rewrite UTs to suite V2 test. ## How was this patch tested? UTs. Author: jerryshaoCloses #20688 from jerryshao/SPARK-23096. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c68ec4e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c68ec4e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c68ec4e6 Branch: refs/heads/master Commit: c68ec4e6a1ed9ea13345c7705ea60ff4df7aec7b Parents: 35997b5 Author: jerryshao Authored: Tue Mar 27 14:39:05 2018 -0700 Committer: Tathagata Das Committed: Tue Mar 27 14:39:05 2018 -0700 -- apache.spark.sql.sources.DataSourceRegister | 3 +- .../sql/execution/datasources/DataSource.scala | 6 +- .../streaming/RateSourceProvider.scala | 262 -- .../continuous/ContinuousRateStreamSource.scala | 25 +- .../sources/RateStreamMicroBatchReader.scala| 222 .../streaming/sources/RateStreamProvider.scala | 125 +++ .../streaming/sources/RateStreamSourceV2.scala | 187 -- .../execution/streaming/RateSourceSuite.scala | 194 --- .../execution/streaming/RateSourceV2Suite.scala | 191 -- .../sources/RateStreamProviderSuite.scala | 344 +++ 10 files changed, 715 insertions(+), 844 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c68ec4e6/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 1fe9c09..1b37905 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider -org.apache.spark.sql.execution.streaming.RateSourceProvider +org.apache.spark.sql.execution.streaming.sources.RateStreamProvider org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider -org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 http://git-wip-us.apache.org/repos/asf/spark/blob/c68ec4e6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 31fa89b..b84ea76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider +import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -566,6 +566,7 @@ object DataSource extends Logging { val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName +val rate = classOf[RateStreamProvider].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -587,7 +588,8 @@ object DataSource extends Logging { "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv, - "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket + "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket, +
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177244722 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +EpochCoordinatorRef.get( --- End diff -- nit: Add comment on what this does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177245022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) --- End diff -- Isnt it better to keep the name consistent with WriterToDataSource? Say, WriteToContinuousDataSourceExec? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177275489 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) --- End diff -- nit: this indentation looks weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177243246 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) --- End diff -- add docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20906 jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175580404 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") +case e: EventTimeWatermark => + val statefulChildren = e.collect { +case a: Aggregate if a.isStreaming => a +case d: Deduplicate if d.isStreaming => d +case f: FlatMapGroupsWithState if f.isStreaming => f + } + statefulChildren.foreach { statefulNode => +if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) { + throwError("Watermarks both before and after a stateful operator in a streaming " + --- End diff -- This gives the impression that it makes sense but we dont support it. In fact, its just ill-defined. May change this to something like ... Multiple watermarks before and after stateful operators is not well-defined in a streaming query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175580451 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -140,6 +140,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Complete, expectedMsgs = Seq("distinct aggregation")) + assertNotSupportedInStreamingPlan( +"aggregate on both sides of stateful op", +EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 1 second"), + Aggregate( +attributeWithWatermark :: Nil, +aggExprs("a"), +EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 2 seconds"), + streamRelation))), +outputMode = Append, +expectedMsgs = Seq("both before and after")) --- End diff -- Add for other stateful operators as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175579879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") +case e: EventTimeWatermark => + val statefulChildren = e.collect { +case a: Aggregate if a.isStreaming => a +case d: Deduplicate if d.isStreaming => d +case f: FlatMapGroupsWithState if f.isStreaming => f --- End diff -- Should be for joins as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20848: [SPARK-23623][SS] Avoid concurrent use of cached ...
Github user tdas closed the pull request at: https://github.com/apache/spark/pull/20848 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 Just to be clear, I am not saying that we *have to* move to this pool stuff. I am just saying that if we want to make this more robust, then we should try to use existing tools (after careful evaluation), rather than trying to reinvent the wheel. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 @tedyu It was indeed hard to find :) But apache commons pool does expose metrics on idle/active counts. See https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/BaseGenericObjectPool.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20848: [SPARK-23623][SS] Avoid concurrent use of cached ...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20848 [SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer (branch-2.3) This is a backport of #20767 to branch 2.3 ## What changes were proposed in this pull request? CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data. Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses. This PR is a step towards that goal. It does the following. - There are effectively two kinds of consumer that may be generated - Cached consumer - this should be returned to the pool at task end - Non-cached consumer - this should be closed at task end - A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`. - If there is request for a consumer that is in-use, then a new consumer is generated. - If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release. - In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached. This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only. ## How was this patch tested? A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23623-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20848 commit 9e440a4e788980e0dc475aa7966c3e56010e7cf7 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-16T18:11:07Z [SPARK-23623][SS] Avoid concurrent use of cached consumers in CachedKafkaConsumer CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be mu
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 @tedyu @zsxwing My thoughts on this is that we should consider migrating to something like Apache Common Pool (assuming it does not require additional maven libraries), which might be less maintenance load. It could be that it already has metrics, etc. that we can leverage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 The idea is good. But how do you propose exposing that information? Periodic print out in the log? From a different angle, I would rather not do feature creep in this PR that is intended to be backported to 2.3. On Mar 15, 2018 7:31 PM, "tedyu" <notificati...@github.com> wrote: > *@tedyu* commented on this pull request. > -- > > In external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ > KafkaDataConsumer.scala > <https://github.com/apache/spark/pull/20767#discussion_r174984237>: > > >CachedKafkaDataConsumer(newInternalConsumer) > > -} else if (existingInternalConsumer.inuse) { > +} else if (existingInternalConsumer.inUse) { >// If consumer is already cached but is currently in use, then return a new consumer >NonCachedKafkaDataConsumer(newInternalConsumer) > > Maybe keep an internal counter for how many times the non cached consumer > is created. > This would give us information on how effective the cache is > > — > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/20767#pullrequestreview-104439601>, > or mute the thread > <https://github.com/notifications/unsubscribe-auth/AAoerMcXNmKmobW4ws25hx3OvcER-1Ptks5teyPogaJpZM4SiC1I> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 @tedyu @zsxwing thank you very much for catching the bugs. I have simplified the logic quite a bit. Note that I removed the invariant that I had introduced earlier. Additionally, I locally ran the stress test with 100 threads and 1 read attempts, which ran for 2 mins. It passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r174973494 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() --- End diff -- I rewrote the logic. Hopefully, it's simpler to reason about it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r174968594 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r174968294 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 @koeninger good question Cody! I think we should fix this limitation eventually. The only reason I am not doing that in this PR is to keep the changes minimum for backporting to 2.3.x. Eventually, we should not do such cache management, and rather use something like [Apache Common Pool](https://commons.apache.org/proper/commons-pool/index.html). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173602790 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173602735 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173602480 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - + private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = { synchronized { - val consumer = cache.get(key) - if (consumer != null) { -consumer.inuse = false - } else { -logWarning(s"Attempting to release consumer that does not exist") - } -} - } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() + // If it has been marked for close, then do it any way + if (intConsumer.inuse && intConsumer.markedForClose) intConsumer.close() + intConsumer.inuse = false + + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(intConsumer.topicPartition, intConsumer.kafkaParams) + val cachedIntConsumer = cache.get(key) + if (cachedIntConsumer != null) { +if (cachedIntConsumer.eq(intConsumer)) { + // The released consumer is indeed the cached one. + cache.remove(key) +} else { + // The released consumer is not the cached one. Don't do anything. + // This should not happen as long as we maintain the invariant mentioned above. + logWarning( +s"Cached consumer not the same one as the one being release" + + s"\ncached = $cachedIntConsumer [${System.identityHashCode(cachedIntConsumer)}]" + + s"\nreleased = $intConsumer [${System.identityHashCode(intConsumer)}]") +} + } else { +// The released consumer is not in the cache. Don't do anything. +// This should not happen as long as we maintain the invariant mentioned above. +logWarning(s"Attempting to release consumer that is not in the cache") } } } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by any one + * else. Within this guarantee, this will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173602442 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +private[kafka010] sealed trait KafkaDataConsumer { + /** + * Get the record for the given offset if available. Otherwise it will either throw error + * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), + * or null. + * + * @param offset the offset to fetch. + * @param untilOffsetthe max offset to fetch. Exclusive. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at + * offset if available, or throw exception.when `failOnDataLoss` is `false`, + * this method will either return record at offset if available, or return + * the next earliest available record less than untilOffset, or null. It + * will not throw any exception. + */ + def get( + offset: Long, + untilOffset: Long, + pollTimeoutMs: Long, + failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { +internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + } + + /** + * Return the available offset range of the current partition. It's a pair of the earliest offset + * and the latest offset. + */ + def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange() + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer +} + /** - * Consumer of single topicpartition, intended for cached reuse. - * Underlying consumer is not threadsafe, so neither is this, - * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + * A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected. + * This is not for direct use outside this file. */ -private[kafka010] case class CachedKafkaConsumer private( +private[kafka010] case class InternalKafkaConsumer( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) extends Logging { - import CachedKafkaConsumer._ + import InternalKafkaConsumer._ private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - private var consumer = createConsumer + @volatile private var consumer = createConsumer --- End diff -- yeah, i just added them to be safer. one less thing to worry about. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173351789 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") --- End diff -- This should not be the case. We do not remove any consumer from the cache while it is being used. So the scenario that you mentioned should not happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173341089 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") --- End diff -- Aah. The warning was misleading. Will add comments to clarify that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173340037 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") } } } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() - } -} - } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newNonCachedConsumer = + new NonCachedKafkaDataConsumer(new InternalKafkaConsumer(topicPartition, kafkaParams)) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true - cache.put(key, consumer) - consumer -} else { - if (!cache.containsKey(key)) { -cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams)) + // If this is reattempt at running the task, then invalidate cache and start with + // a new consumer + if (existingInternalConsumer != null) { --- End diff -- This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173338056 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") } } } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() - } -} - } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newNonCachedConsumer = + new NonCachedKafkaDataConsumer(new InternalKafkaConsumer(topicPartition, kafkaParams)) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true - cache.put(key, consumer) - consumer -} else { - if (!cache.containsKey(key)) { -cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams)) + // If this is reattempt at running the task, then invalidate cache and start with + // a new consumer + if (existingInternalConsumer != null) { +existingInternalConsumer.markedForClose = true } - val consumer = cache.get(key) - consumer.inuse = true - consumer + newNonCachedConsumer +} else if (!useCache) { + newNonCachedConsumer +} else if (existingInternalConsumer == null) { + newNonCachedConsumer.internalConsumer.inuse = true + cache.put(key, newNonCachedConsumer.internalConsumer) + newNonCachedConsumer --- End diff -- oh yes. damn it. my bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 jenkins retest this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20765: [SPARK-23406][SS] Enable stream-stream self-joins...
Github user tdas closed the pull request at: https://github.com/apache/spark/pull/20765 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3
Repository: spark Updated Branches: refs/heads/branch-2.3 1dd37ff3b -> 404f7e201 [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3 This is a backport of #20598. ## What changes were proposed in this pull request? Solved two bugs to enable stream-stream self joins. ### Incorrect analysis due to missing MultiInstanceRelation trait Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details). ### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations. ``` val df = input.toDF val join = df.select('value % 5 as "key", 'value).join( df.select('value % 5 as "key", 'value), "key") ``` Streaming logical plan before splicing the batch plan ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- StreamingExecutionRelation Memory[#1], value#12 // two different leaves pointing to same source ``` Batch logical plan after splicing the batch plan and before rewriting ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#12 ``` Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation). ``` Project [key#6, value#66, value#66] // both value#1 and value#12 replaces by value#66 +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9, value#66] +- LocalRelation [value#66] ``` This causes the optimizer to eliminate value#66 from one side of the join. ``` Project [key#6, value#66, value#66] +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9] // this does not generate value, incorrect join results +- LocalRelation [value#66] ``` **Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest. ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- Project [value#66 AS value#1] // solution: project with aliases : +- LocalRelation [value#66] +- Project [(value#12 % 5) AS key#9, value#12] +- Project [value#66 AS value#12]// solution: project with aliases +- LocalRelation [value#66] ``` ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #20765 from tdas/SPARK-23406-2.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/404f7e20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/404f7e20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/404f7e20 Branch: refs/heads/branch-2.3 Commit: 404f7e2013ecfdf993a17fd942d8890d9a8100e7 Parents: 1dd37ff Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Mar 7 21:58:57 2018 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Mar 7 21:58:57 2018 -0800 -- .../streaming/MicroBatchExecution.scala | 16 ++--- .../execution/streaming/StreamingRelation.scala | 20 +++- .../sql/streaming/StreamingJoinSuite.scala | 25 +++- 3 files changed, 45 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/404f7e20/sql/core/src/main/scala/org/apache/spar
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20767 @zsxwing @brkyvz PTAL. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: Fixed
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20767 Fixed ## What changes were proposed in this pull request? CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data. Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403).] If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses. This PR is a step towards that goal. It does the following. - There are effectively two kinds of consumer that may be generated - Cached consumer - this should be returned to the pool at task end - Non-cached consumer - this should be closed at task end - A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`. - If there is request for a consumer that is in-use, then a new consumer is generated. - If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release. This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23623 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20767 commit 97510c6952a865caf41e6b6f19c3af7e714c3ad6 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-08T02:23:45Z Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20765: [SPARK-23406][SS] Enable stream-stream self-joins...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20765 [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3 This is a backport of #20598. ## What changes were proposed in this pull request? Solved two bugs to enable stream-stream self joins. ### Incorrect analysis due to missing MultiInstanceRelation trait Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details). ### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations. ``` val df = input.toDF val join = df.select('value % 5 as "key", 'value).join( df.select('value % 5 as "key", 'value), "key") ``` Streaming logical plan before splicing the batch plan ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- StreamingExecutionRelation Memory[#1], value#12 // two different leaves pointing to same source ``` Batch logical plan after splicing the batch plan and before rewriting ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#1 +- Project [(value#12 % 5) AS key#9, value#12] +- LocalRelation [value#66] // replaces StreamingExecutionRelation Memory[#1], value#12 ``` Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation). ``` Project [key#6, value#66, value#66] // both value#1 and value#12 replaces by value#66 +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9, value#66] +- LocalRelation [value#66] ``` This causes the optimizer to eliminate value#66 from one side of the join. ``` Project [key#6, value#66, value#66] +- Join Inner, (key#6 = key#9) :- Project [(value#66 % 5) AS key#6, value#66] : +- LocalRelation [value#66] +- Project [(value#66 % 5) AS key#9] // this does not generate value, incorrect join results +- LocalRelation [value#66] ``` **Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest. ``` Project [key#6, value#1, value#12] +- Join Inner, (key#6 = key#9) :- Project [(value#1 % 5) AS key#6, value#1] : +- Project [value#66 AS value#1] // solution: project with aliases : +- LocalRelation [value#66] +- Project [(value#12 % 5) AS key#9, value#12] +- Project [value#66 AS value#12]// solution: project with aliases +- LocalRelation [value#66] ``` ## How was this patch tested? New unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23406-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20765.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20765 commit c3ec9ef9355a3290d764dda0191165eaa4e49062 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-02-14T22:27:02Z [SPARK-23406][SS] Enable stream-stream self-joins ## What changes were proposed in this pull request? Solved two bugs to enable stream-stream self joins. ### Incorrect analysis due to missing MultiInstanceRelation trait Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the cataly
[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20688#discussion_r172730994 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair} -import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2 +import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair} +import org.apache.spark.sql.execution.streaming.sources.RateSourceProvider import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} --- End diff -- Could you make the names of the different readers consistent with each other? Similar to Kafka? RateStreamProvider RateStreamMicroBatchReader, RateStreamMicroBatchDataReaderFactory RateStreamContinuousReader, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20688#discussion_r172730333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateSourceSuite.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.nio.file.Files +import java.util.Optional +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.Offset +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.util.ManualClock + +class RateSourceSuite extends StreamTest { --- End diff -- Why did you not move this file using "git mv" and then change? Then we would have been able to diff it properly. This was a pain in the text socket v2 PR as well :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20688#discussion_r172729894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateSourceProvider.scala --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.Optional +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.IOUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, DataSourceV2, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset} +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} +import org.apache.spark.util.{ManualClock, SystemClock} + +object RateSourceProvider { + val SCHEMA = +StructType(StructField("timestamp", TimestampType) :: StructField("value", LongType) :: Nil) + + val VERSION = 1 + + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" + val RAMP_UP_TIME = "rampUpTime" + + /** Calculate the end value we will emit at the time `seconds`. */ + def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: Long): Long = { +// E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10 +// Then speedDeltaPerSecond = 2 +// +// seconds = 0 1 2 3 4 5 6 +// speed = 0 2 4 6 8 10 10 (speedDeltaPerSecond * seconds) +// end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2 +val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1) +if (seconds <= rampUpTimeSeconds) { + // Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 2" in a special way to + // avoid overflow + if (seconds % 2 == 1) { +(seconds + 1) / 2 * speedDeltaPerSecond * seconds + } else { +seconds / 2 * speedDeltaPerSecond * (seconds + 1) + } +} else { + // rampUpPart is just a special case of the above formula: rampUpTimeSeconds == seconds + val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, rampUpTimeSeconds) + rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond +} + } +} + +class RateSourceProvider extends DataSourceV2 + with MicroBatchReadSupport with ContinuousReadSupport with DataSourceRegister { + import RateSourceProvider._ + + private def checkParameters(options: DataSourceOptions): Unit = { +if (options.get(ROWS_PER_SECOND).isPresent) { + val rowsPerSecond = options.get(ROWS_PER_SECOND).get().toLong + if (rowsPerSecond <= 0) { +throw new IllegalArgumentException( + s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be positive") + } +} + +if (options.get(RAMP_UP_TIME).isPresent) { + val rampUpTimeSeconds = +JavaUtils.timeStringAsSec(options.get(RAMP_UP_TIME).get()) + if (rampUpTimeSeconds < 0) { +throw new IllegalArgumentException( + s"Invalid value '$rampUpTimeSeconds'. The option 'rampUpTime' must not be negative")
[GitHub] spark pull request #20755: [SPARK-23406][SS] Enable stream-stream self-joins...
Github user tdas closed the pull request at: https://github.com/apache/spark/pull/20755 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20755: [SPARK-23406][SS] Enable stream-stream self-joins...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20755 [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3 ## What changes were proposed in this pull request? This is limited but safe-to-backport version of self-join-fix made in #20598 That PR solved two bugs 1. Add MultiInstanceRelation trait to leaf logical nodes to allow resolution - This is the major fix required to allow streaming self-joins, and is safe to backport. 2. Fix attribute rewriting in MicroBatchExecution when micro-batch plans are spliced into the streaming logical plan - This is a minor fix that is not safe to backport. Without this fix only a very small fraction self-join cases will have issues, but those issues may lead to incorrect results. ## How was this patch tested? New unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23406-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20755.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20755 commit 484babb58d9cf61d5dcc6521865cd2a5db64dd82 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-07T00:53:34Z Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20710 @rdblue @jose-torres arrgh... i didnt notice that you guys were still commenting before i merged it. feel free to continue discussion and if any change is needed we will deal with this accordingly. sorry about it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Repository: spark Updated Branches: refs/heads/master ba622f45c -> b0f422c38 [SPARK-23559][SS] Add epoch ID to DataWriterFactory. ## What changes were proposed in this pull request? Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode. I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query. ## How was this patch tested? existing unit tests Author: Jose TorresCloses #20710 from jose-torres/api2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f422c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f422c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f422c3 Branch: refs/heads/master Commit: b0f422c3861a5a3831e481b8ffac08f6fa085d00 Parents: ba622f4 Author: Jose Torres Authored: Mon Mar 5 13:23:01 2018 -0800 Committer: Tathagata Das Committed: Mon Mar 5 13:23:01 2018 -0800 -- .../spark/sql/kafka010/KafkaStreamWriter.scala | 5 +++- .../spark/sql/sources/v2/writer/DataWriter.java | 12 ++ .../sources/v2/writer/DataWriterFactory.java| 5 +++- .../v2/writer/streaming/StreamWriter.java | 19 +++ .../datasources/v2/WriteToDataSourceV2.scala| 25 +--- .../streaming/MicroBatchExecution.scala | 7 ++ .../sources/PackedRowWriterFactory.scala| 5 +++- .../execution/streaming/sources/memoryV2.scala | 5 +++- .../sources/v2/SimpleWritableDataSource.scala | 10 ++-- 9 files changed, 65 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index 9307bfc..ae5b5c5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory( topic: Option[String], producerParams: Map[String, String], schema: StructType) extends DataWriterFactory[InternalRow] { - override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { + override def createDataWriter( + partitionId: Int, + attemptNumber: Int, + epochId: Long): DataWriter[InternalRow] = { new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 53941a8..39bf458 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.spark.annotation.InterfaceStability; /** - * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is + * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is * responsible for writing data for an input RDD partition. * * One Spark task has one exclusive data writer, so there is no thread-safe concern. @@ -31,13 +31,17 @@ import org.apache.spark.annotation.InterfaceStability; * the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will * not be processed. If all records are successfully written, {@link #commit()} is called. * + * Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle + * is over and Spark will not use it again. + * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r172081660 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -31,13 +31,17 @@ * the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will * not be processed. If all records are successfully written, {@link #commit()} is called. * + * Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle + * is over and Spark will not use it again. + * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an - * exception will be sent to the driver side, and Spark will retry this writing task for some times, - * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`, - * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail. + * exception will be sent to the driver side, and Spark may retry this writing task a few times. + * In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a + * different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])} --- End diff -- This is not clear to me. Isnt it the case that abort will be called every time a task attempt ends in an error? This seems to give the impression that abort is called only after N failed attempts have been made. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20698 Thank you. Merging to master only as this is a new feature touching production code paths. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993983 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java --- @@ -39,21 +36,21 @@ * If this method fails (by throwing an exception), this writing job is considered to have been * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. * - * To support exactly-once processing, writer implementations should ensure that this method is - * idempotent. The execution engine may call commit() multiple times for the same epoch - * in some circumstances. + * The execution engine may call commit() multiple times for the same epoch in some circumstances. --- End diff -- Somewhere in this file, add docs about what epochId means for MB and C execution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993716 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -48,6 +48,9 @@ * same task id but different attempt number, which means there are multiple * tasks with the same task id running at the same time. Implementations can * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For queries that execute as a single batch, this --- End diff -- Also, make it clear that, this is batchId for MicroBatch processing and epochId for Continuous processing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993622 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -48,6 +48,9 @@ * same task id but different attempt number, which means there are multiple * tasks with the same task id running at the same time. Implementations can * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For queries that execute as a single batch, this --- End diff -- For non-streaming queries, this... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993559 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -36,8 +36,9 @@ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark will retry this writing task for some times, --- End diff -- Break this sentence. very long. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993519 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -36,8 +36,9 @@ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark will retry this writing task for some times, --- End diff -- for some times --> for a few times --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993467 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java --- @@ -36,8 +36,9 @@ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark will retry this writing task for some times, --- End diff -- Spark may retry... (in continuous we dont retry the task) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -172,17 +173,19 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) val epochCoordinator = EpochCoordinatorRef.get( context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) val currentMsg: WriterCommitMessage = null var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong do { + var dataWriter: DataWriter[InternalRow] = null // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { try { + dataWriter = writeTask.createDataWriter( +context.partitionId(), context.attemptNumber(), currentEpoch) iter.foreach(dataWriter.write) --- End diff -- fix this! dont use foreach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -198,7 +201,7 @@ object DataWritingSparkTask extends Logging { })(catchBlock = { // If there is an error, abort this writer logError(s"Writer for partition ${context.partitionId()} is aborting.") -dataWriter.abort() +if (dataWriter != null) dataWriter.abort() logError(s"Writer for partition ${context.partitionId()} aborted.") --- End diff -- nit: add comment that the exception will be rethrown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20710#discussion_r171993066 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -48,6 +48,9 @@ * same task id but different attempt number, which means there are multiple * tasks with the same task id running at the same time. Implementations can * use this attempt number to distinguish writers of different task attempts. + * @param epochId A monotonically increasing id for streaming queries that are split in to + *discrete periods of execution. For queries that execute as a single batch, this + *id will always be zero. */ - DataWriter createDataWriter(int partitionId, int attemptNumber); + DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); --- End diff -- Add clear lifecycle semantics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171989658 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +}.filter(_.size > 0) + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { + // Assign preferred executor locations to each range such that the same topic-partition is + // preferentially read from the same executor and the KafkaConsumer can be reused. + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => +// Split the current range into subranges as close to the ideal range size +val rangeSize = range.untilOffset - range.fromOffset +val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + +(0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( +range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) +} else None + } +} + +private[kafka010] object KafkaOffsetR
spark git commit: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Repository: spark Updated Branches: refs/heads/master 3a4d15e5d -> 707e6506d [SPARK-23097][SQL][SS] Migrate text socket source to V2 ## What changes were proposed in this pull request? This PR moves structured streaming text socket source to V2. Questions: do we need to remove old "socket" source? ## How was this patch tested? Unit test and manual verification. Author: jerryshaoCloses #20382 from jerryshao/SPARK-23097. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/707e6506 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/707e6506 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/707e6506 Branch: refs/heads/master Commit: 707e6506d0dbdb598a6c99d666f3c66746113b67 Parents: 3a4d15e Author: jerryshao Authored: Fri Mar 2 12:27:42 2018 -0800 Committer: Tathagata Das Committed: Fri Mar 2 12:27:42 2018 -0800 -- apache.spark.sql.sources.DataSourceRegister | 2 +- .../sql/execution/datasources/DataSource.scala | 5 +- .../spark/sql/execution/streaming/socket.scala | 219 - .../execution/streaming/sources/socket.scala| 255 .../spark/sql/streaming/DataStreamReader.scala | 21 +- .../streaming/TextSocketStreamSuite.scala | 231 -- .../sources/TextSocketStreamSuite.scala | 306 +++ 7 files changed, 582 insertions(+), 457 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0259c77..1fe9c09 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,6 +5,6 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider -org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider +org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6e1b572..35fcff6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -563,6 +564,7 @@ object DataSource extends Logging { val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" val nativeOrc = classOf[OrcFileFormat].getCanonicalName +val socket = classOf[TextSocketSourceProvider].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -583,7 +585,8 @@ object DataSource extends Logging { "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, - "com.databricks.spark.csv" -> csv + "com.databricks.spark.csv" -> csv, + "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket ) } http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala -- diff
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20382 LGTM. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171954250 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750758 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt --- End diff -- I rewrote this completely using the code used by from `sparkContext.parallelize` to make splits. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L123 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750580 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) --- End diff -- `fromOffsets` here will contain the initial offsets of new partitions. See the how fromOffsets is set with `startOffsets + newPartitionInitialOffsets`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171750437 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { --- End diff -- Rewritten. I dont want to rely on this default value of 0, as @jose-torres expressed concern earlier. So i rewrote this to explicitly check whether minPartitions have been set or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171741765 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { +val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + +val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) +} + +// If minPartitions not set or there are enough partitions to satisfy minPartitions +if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => +range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } +} else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => +val tp = offsetRange.topicPartition +val size = offsetRange.untilOffset - offsetRange.fromOffset +// number of partitions to divvy up this topic partition to +val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt +var remaining = size +var startOffset = offsetRange.fromOffset +(0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) + remaining -= thisPartition + val endOffset = startOffset + thisPartition + val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) + startOffset = endOffset + offsetRange +} + } +} + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { +def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + +val numExecutors = executorLocations.length +if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.ha
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171732015 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) --- End diff -- not much really. assert throws Assertions and require throws IllegalArgumentException. Just a matter of preference. I can revert this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20703: [SPARK-19185][SS] Make Kafka consumer cache configurable
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20703 I completely agree with @zsxwing, let understand what the issue is rather than covering it up with a workaround. We should not run into such issue at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20382 relevant test failed. please make sure that there is no flakiness in the tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171510614 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20382 @jerryshao please address the above comment, then we are good to merge! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20698 @zsxwing @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20698#discussion_r171441133 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { --- End diff -- add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20698 [SPARK-23541][SS] Allow Kafka source to read data with greater parallelism than the number of topic-partitions ## What changes were proposed in this pull request? Currently, when the Kafka source reads from Kafka, it generates as many tasks as the number of partitions in the topic(s) to be read. In some case, it may be beneficial to read the data with greater parallelism, that is, with more number partitions/tasks. That means, offset ranges must be divided up into smaller ranges such the number of records in partition ~= total records in batch / desired partitions. This would also balance out any data skews between topic-partitions. In this patch, I have added a new option called `minPartitions`, which allows the user to specify the desired level of parallelism. ## How was this patch tested? New tests in KafkaMicroBatchV2SourceSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23541 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20698.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20698 commit ebb9b51c51a4411811a7e0e09fff8f8608faa017 Author: Tathagata Das <tathagata.das1565@...> Date: 2018-03-01T01:28:32Z Implemented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171226477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -61,13 +68,13 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] + private val batches = new ListBuffer[(String, Timestamp)] @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) + private[sources] var currentOffset: LongOffset = LongOffset(-1L) --- End diff -- this does not make sene. you are directly accessing something that should be accessed while synchronized on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171225853 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import