[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r180933380
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest {
   // A continuous trigger that will only fire the initial time for the 
duration of a test.
   // This allows clean testing with manual epoch advancement.
   protected val longContinuousTrigger = Trigger.Continuous("1 hour")
+
+  override protected implicit val defaultTrigger = Trigger.Continuous(100)
+  override protected val defaultUseV2Sink = true
 }
 
 class ContinuousSuite extends ContinuousSuiteBase {
   import testImplicits._
 
-  test("basic rate source") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
+  test("basic") {
+val input = MemoryStream[Int]
--- End diff --

I think that's too much to hope for right now. We can do that later. For 
now, let's make everything explicitly ContinuousMemoryStream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...

2018-04-11 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/21048

[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common 
CheckpointFileManager interface

## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured 
Streaming must be written atomically such that no partial files are generated 
(would break fault-tolerance guarantees). Currently, there are 3 locations 
which try to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any 
implementation of `FileSystem` or `FileContext` APIs. It preferably loads 
`FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to 
perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem 
implementation.
  - Writing a snapshot file - Same as above.

 Current problems:
1. State Store behavior is incorrect - 
1. Inflexible - Some file systems provide mechanisms other than 
write-to-temp-file-and-rename for writing atomically and more efficiently. For 
example, with S3 you can write directly to the final file and it will be made 
visible only when the entire file is written and closed correctly. Any failure 
can be made to terminate the writing without making any partial files visible 
in S3. The current code does not abstract out this mechanism enough that it can 
be customized. 

 Solution:

1. Introduce a common interface that all 3 cases above can use to write 
checkpoint files atomically. 
2. This interface must provide the necessary interfaces that allow 
customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and 
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. 
Similar to earlier `FileManager`, there are implementations based on 
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred 
to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which 
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All 
users of this method need to either call `close()` to successfully write the 
file, or `cancel()` in case of an error.


## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing 
tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23966

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21048.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21048


commit df7b339d73097b8501fe0937f770b8b2ded1b63e
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-04-11T04:21:14Z

CheckpointFileManager




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20983: [SPARK-23747][Structured Streaming] Add EpochCoor...

2018-04-10 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20983#discussion_r180553748
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import org.mockito.InOrder
+import org.mockito.Matchers.{any, eq => eqTo}
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.test.SharedSparkSession
+
+class EpochCoordinatorSuite
+  extends SparkFunSuite
+with SharedSparkSession
+with MockitoSugar
+with BeforeAndAfterEach {
+
+  private var epochCoordinator: RpcEndpointRef = _
+
+  private var writer: StreamWriter = _
+  private var query: ContinuousExecution = _
+  private var orderVerifier: InOrder = _
+
+  override def beforeEach(): Unit = {
+val reader = mock[ContinuousReader]
+writer = mock[StreamWriter]
+query = mock[ContinuousExecution]
+orderVerifier = inOrder(writer, query)
+
+epochCoordinator
+  = EpochCoordinatorRef.create(writer, reader, query, "test", 1, 
spark, SparkEnv.get)
+  }
+
+  override def afterEach(): Unit = {
+SparkEnv.get.rpcEnv.stop(epochCoordinator)
+  }
+
+  test("single epoch") {
+setWriterPartitions(3)
+setReaderPartitions(2)
+
+commitPartitionEpoch(0, 1)
+commitPartitionEpoch(1, 1)
+commitPartitionEpoch(2, 1)
+reportPartitionOffset(0, 1)
+reportPartitionOffset(1, 1)
+
+// Here and in subsequent tests this is called to make a synchronous 
call to EpochCoordinator
+// so that mocks would have been acted upon by the time verification 
happens
+makeSynchronousCall()
+
+verifyCommit(1)
+  }
+
+  test("single epoch, all but one writer partition has committed") {
+setWriterPartitions(3)
+setReaderPartitions(2)
+
+commitPartitionEpoch(0, 1)
+commitPartitionEpoch(1, 1)
+reportPartitionOffset(0, 1)
+reportPartitionOffset(1, 1)
+
+makeSynchronousCall()
+
+verifyCommitHasntHappened(1)
+  }
+
+  test("single epoch, all but one reader partition has reported an 
offset") {
+setWriterPartitions(3)
+setReaderPartitions(2)
+
+commitPartitionEpoch(0, 1)
+commitPartitionEpoch(1, 1)
+commitPartitionEpoch(2, 1)
+reportPartitionOffset(0, 1)
+
+makeSynchronousCall()
+
+verifyCommitHasntHappened(1)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179596647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
--- End diff --

Let's make all the memory streams consistently named (like Kafka has 
KafkaMicroBatchReader and KafkaContinuousReader)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179603916
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
--- End diff --

nit: I generally put such small-use generically-named classes inside a 
relevant object to avoid cluttering of the general class namespace. Example: 
this can be inside a object ContinuousMemoryStreamRecordBuffer as that is the 
only one using it. Then when someone searches for a different GetRecord-ish 
named class, then it would be less confusing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179594450
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -43,8 +45,39 @@ object MemoryStream {
   protected val currentBlockId = new AtomicInteger(0)
   protected val memoryStreamId = new AtomicInteger(0)
 
-  def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] 
=
-new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+  def apply[A : Encoder](
+  implicit sqlContext: SQLContext,
+  trigger: Trigger = Trigger.ProcessingTime(0)): MemoryStreamBase[A] = 
trigger match {
+case _: ContinuousTrigger =>
+  new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext)
+case _ =>
+  new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+  }
+}
+
+abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) 
extends BaseStreamingSource {
--- End diff --

Add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179603105
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
--- End diff --

nit: to poll from what? 
(and also clarify what is at the executor and what is at the driver)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179604298
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
+  None
+} else {
+  Some(buf(index))
+}
+  context.reply(record.map(Row(_)))
+}
+  }
+}
+
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+  val NUM_PARTITIONS = 2
+
+  // ContinuousReader implementation
+
+  @GuardedBy("this")
+  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+
+  private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, 
records)
+
+  def addData(data: TraversableOnce[A]): Offset = synchronized {
+// Distribute data evenly among partition lists.
+data.toSeq.zipWithIndex.map {
+  case (item, index) => records(index % NUM_PARTITIONS) += item
+}
+
+// The new target offset is the offset where all records in all 
partitions have been processed.
+ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+  }
+
+  private var startOffset: ContinuousMemoryStreamOffset = _
+
+  override def setStartOffset(start: Optional[Offset]): Unit = 
synch

[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179602477
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
--- End diff --

I would keep the main class (the MemoryStreamContinuousReader) at the top 
and add this comment as part of the Scala docs of that class. And also add docs 
on each of the classes to explain their purpose. This is taking me a whole lot 
of scrolling back and forth for me to understand, even if I roughly know what 
the organization should be. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179601495
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
--- End diff --

Also, all the sources are in streaming/sources/ (or should be), not in 
streaming/continuous.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179617487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
+  None
+} else {
+  Some(buf(index))
+}
+  context.reply(record.map(Row(_)))
+}
+  }
+}
+
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+  val NUM_PARTITIONS = 2
+
+  // ContinuousReader implementation
+
+  @GuardedBy("this")
+  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+
+  private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, 
records)
+
+  def addData(data: TraversableOnce[A]): Offset = synchronized {
+// Distribute data evenly among partition lists.
+data.toSeq.zipWithIndex.map {
+  case (item, index) => records(index % NUM_PARTITIONS) += item
+}
+
+// The new target offset is the offset where all records in all 
partitions have been processed.
+ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+  }
+
+  private var startOffset: ContinuousMemoryStreamOffset = _
+
+  override def setStartOffset(start: Optional[Offset]): Unit = 
synch

[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179617590
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
+  None
+} else {
+  Some(buf(index))
+}
+  context.reply(record.map(Row(_)))
+}
+  }
+}
+
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+  val NUM_PARTITIONS = 2
+
+  // ContinuousReader implementation
+
+  @GuardedBy("this")
+  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+
+  private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, 
records)
+
+  def addData(data: TraversableOnce[A]): Offset = synchronized {
+// Distribute data evenly among partition lists.
+data.toSeq.zipWithIndex.map {
+  case (item, index) => records(index % NUM_PARTITIONS) += item
+}
+
+// The new target offset is the offset where all records in all 
partitions have been processed.
+ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+  }
+
+  private var startOffset: ContinuousMemoryStreamOffset = _
+
+  override def setStartOffset(start: Optional[Offset]): Unit = 
synch

[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179599245
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -53,32 +53,24 @@ class ContinuousSuiteBase extends StreamTest {
   // A continuous trigger that will only fire the initial time for the 
duration of a test.
   // This allows clean testing with manual epoch advancement.
   protected val longContinuousTrigger = Trigger.Continuous("1 hour")
+
+  override protected implicit val defaultTrigger = Trigger.Continuous(100)
+  override protected val defaultUseV2Sink = true
 }
 
 class ContinuousSuite extends ContinuousSuiteBase {
   import testImplicits._
 
-  test("basic rate source") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
+  test("basic") {
+val input = MemoryStream[Int]
--- End diff --

Looking at this test code, its very confusing that this is using continuous 
memory stream, and not ordinary memory stream. The implicit ContinuousTrigger 
magic is not intuitive. I would rather have an explicitly 
`ContinuousMemoryStream` rather than `MemoryStream` magically generating 
two different kinds based on different implicit values. And because of this 
polymorphism, multiple testsuites that do not have continuous stream tests had 
to be changed (functions returning MemoryStream had to return MemoryStreamBase).







---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179617149
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
--- End diff --

This is not really a buffer as it only refers to buffers that are 
externally managed. Its only serves as an endpoint to fetch stuff from the 
buffer. I would prefer one way or the other - either all the buffer management 
and endpoint management should be inside the class (adding and fetching, 
synchronized by this class) OR this is an inner class of ContinuousMemoryStream 
whose only purpose is to be endpoint relaying fetch requests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179605302
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
--- End diff --

super nit: this can be in a single line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179618466
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
+  None
+} else {
+  Some(buf(index))
+}
+  context.reply(record.map(Row(_)))
+}
+  }
+}
+
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+  val NUM_PARTITIONS = 2
+
+  // ContinuousReader implementation
+
+  @GuardedBy("this")
+  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+
+  private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, 
records)
+
+  def addData(data: TraversableOnce[A]): Offset = synchronized {
+// Distribute data evenly among partition lists.
+data.toSeq.zipWithIndex.map {
+  case (item, index) => records(index % NUM_PARTITIONS) += item
+}
+
+// The new target offset is the offset where all records in all 
partitions have been processed.
+ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+  }
+
+  private var startOffset: ContinuousMemoryStreamOffset = _
+
+  override def setStartOffset(start: Optional[Offset]): Unit = 
synch

[GitHub] spark pull request #20828: [SPARK-23687][SS] Add a memory source for continu...

2018-04-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20828#discussion_r179606525
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousMemoryStream.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.{util => ju}
+import java.util.Optional
+import java.util.concurrent.ArrayBlockingQueue
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.{Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, SupportsScanUnsafeRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.RpcUtils
+
+/**
+ * The overall strategy here is:
+ *  * ContinuousMemoryStream maintains a list of records for each 
partition. addData() will
+ *distribute records evenly-ish across partitions.
+ *  * ContinuousMemoryStreamRecordBuffer is set up as an endpoint for 
partition-level
+ *ContinuousMemoryStreamDataReader instances to poll. It returns the 
record at the specified
+ *offset within the list, or null if that offset doesn't yet have a 
record.
+ */
+
+private case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
+
+private class ContinuousMemoryStreamRecordBuffer[A](
+stream: ContinuousMemoryStream[A],
+partitionBuffers: Seq[ListBuffer[A]]) extends ThreadSafeRpcEndpoint {
+  override val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case GetRecord(ContinuousMemoryStreamPartitionOffset(part, index)) => 
stream.synchronized {
+  val buf = partitionBuffers(part)
+
+  val record =
+if (buf.size <= index) {
+  None
+} else {
+  Some(buf(index))
+}
+  context.reply(record.map(Row(_)))
+}
+  }
+}
+
+class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+extends MemoryStreamBase[A](sqlContext) with ContinuousReader with 
ContinuousReadSupport {
+  private implicit val formats = Serialization.formats(NoTypeHints)
+  val NUM_PARTITIONS = 2
+
+  // ContinuousReader implementation
+
+  @GuardedBy("this")
+  private val records = Seq.fill(NUM_PARTITIONS)(new ListBuffer[A])
+
+  private val recordBuffer = new ContinuousMemoryStreamRecordBuffer(this, 
records)
+
+  def addData(data: TraversableOnce[A]): Offset = synchronized {
+// Distribute data evenly among partition lists.
+data.toSeq.zipWithIndex.map {
+  case (item, index) => records(index % NUM_PARTITIONS) += item
+}
+
+// The new target offset is the offset where all records in all 
partitions have been processed.
+ContinuousMemoryStreamOffset((0 until NUM_PARTITIONS).map(i => (i, 
records(i).size)).toMap)
+  }
+
+  private var startOffset: ContinuousMemoryStreamOffset = _
+
+  override def setStartOffset(start: Optional[Offset]): Unit = 
synch

spark git commit: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-04-03 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 7cf9fab33 -> 66a3a5a2d


[SPARK-23099][SS] Migrate foreach sink to DataSourceV2

## What changes were proposed in this pull request?

Migrate foreach sink to DataSourceV2.

Since the previous attempt at this PR #20552, we've changed and strictly 
defined the lifecycle of writer components. This means we no longer need the 
complicated lifecycle shim from that PR; it just naturally works.

## How was this patch tested?

existing tests

Author: Jose Torres 

Closes #20951 from jose-torres/foreach.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66a3a5a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66a3a5a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66a3a5a2

Branch: refs/heads/master
Commit: 66a3a5a2dc83e03dedcee9839415c1ddc1fb8125
Parents: 7cf9fab
Author: Jose Torres 
Authored: Tue Apr 3 11:05:29 2018 -0700
Committer: Tathagata Das 
Committed: Tue Apr 3 11:05:29 2018 -0700

--
 .../sql/execution/streaming/ForeachSink.scala   |  68 -
 .../sources/ForeachWriterProvider.scala | 111 +++
 .../spark/sql/streaming/DataStreamWriter.scala  |   4 +-
 .../execution/streaming/ForeachSinkSuite.scala  | 305 --
 .../streaming/sources/ForeachWriterSuite.scala  | 306 +++
 .../sql/streaming/StreamingQuerySuite.scala |   1 +
 6 files changed, 420 insertions(+), 375 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/66a3a5a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
deleted file mode 100644
index 2cc5410..000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
-
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the 
contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. The 
reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but 
`Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan to 
collect metrics and
-// update watermark, we should never create a new plan. Otherwise, metrics 
and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
-val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
-  }
-}
-  }
-
-  

[GitHub] spark issue #20951: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

2018-04-02 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20951
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20951#discussion_r178648761
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -141,7 +141,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 query.processAllAvailable()
   }
   assert(e.getCause.isInstanceOf[SparkException])
-  assert(e.getCause.getCause.getMessage === "error")
+  assert(e.getCause.getCause.getCause.getMessage === "ForeachSinkSuite 
error")
--- End diff --

why 3 levels? Can you paste the levels?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20951#discussion_r178648616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -131,7 +131,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 .foreach(new TestForeachWriter() {
--- End diff --

And move this to streaming.sources package similar ConsoleWriterSuite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20951#discussion_r178648434
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -131,7 +131,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 .foreach(new TestForeachWriter() {
--- End diff --

Maybe rename this to `ForeachWriterSuite`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20958: [SPARK-23844][SS] Fix socket source honors recovered off...

2018-04-02 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20958
  
We have made it clear that sockets is ONLY for testing and will not recover 
data from checkpoints. So I see no problem that it throws errors when 
attempting to recover. May we can improve the error message by making it clear 
that recovery is not supported. 

If you indeed want to forget lost data and proceed, then that should be an 
opt-in. We could do this by explicitly setting a source option (like 
failOnDataLoss = false in Kafka source). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20958: [SPARK-23844][SS] Fix socket source honors recove...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20958#discussion_r178646725
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -238,6 +238,10 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 "write files of Hive data source directly.")
 }
 
+val isSocketExists = df.queryExecution.analyzed.collect {
--- End diff --

I see what you are trying to do. But, honestly, we should NOT add any more 
special cases for specific sources. We already have memory and foreach, because 
it is hard to get rid of those. We should not add more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20951#discussion_r178645775
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,81 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.sql.{Encoder, ForeachWriter, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, 
DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
--- End diff --

Actually, why not make it extend DataSourceV2 for consistency sake? Then it 
is easier to find all data sources in code by looking at who extends 
DataSourceV2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20951: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-04-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20951#discussion_r178645279
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,81 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.sql.{Encoder, ForeachWriter, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, 
DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
--- End diff --

Rename the file accordingly. and Add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23827][SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions

2018-03-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 3f5955aa0 -> 507cff246


[SPARK-23827][SS] StreamingJoinExec should ensure that input data is 
partitioned into specific number of partitions

## What changes were proposed in this pull request?

Currently, the requiredChildDistribution does not specify the partitions. This 
can cause the weird corner cases where the child's distribution is 
`SinglePartition` which satisfies the required distribution of 
`ClusterDistribution(no-num-partition-requirement)`, thus eliminating the 
shuffle needed to repartition input data into the required number of partitions 
(i.e. same as state stores). That can lead to "file not found" errors on the 
state store delta files as the micro-batch-with-no-shuffle will not run certain 
tasks and therefore not generate the expected state store delta files.

This PR adds the required constraint on the number of partitions.

## How was this patch tested?
Modified test harness to always check that ANY stateful operator should have a 
constraint on the number of partitions. As part of that, the existing opt-in 
checks on child output partitioning were removed, as they are redundant.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #20941 from tdas/SPARK-23827.

(cherry picked from commit 15298b99ac8944e781328423289586176cf824d7)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/507cff24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/507cff24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/507cff24

Branch: refs/heads/branch-2.3
Commit: 507cff246cd9e15a418d67b66bf762be4ae71c67
Parents: 3f5955a
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Mar 30 16:48:26 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Mar 30 16:48:55 2018 -0700

--
 .../streaming/IncrementalExecution.scala|  2 +-
 .../StreamingSymmetricHashJoinExec.scala|  3 +-
 .../spark/sql/streaming/DeduplicateSuite.scala  |  8 +---
 .../streaming/FlatMapGroupsWithStateSuite.scala |  5 +-
 .../sql/streaming/StatefulOperatorTest.scala| 49 
 .../apache/spark/sql/streaming/StreamTest.scala | 19 
 .../streaming/StreamingAggregationSuite.scala   |  4 +-
 7 files changed, 25 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index a10ed5f..1a83c88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -62,7 +62,7 @@ class IncrementalExecution(
   StreamingDeduplicationStrategy :: Nil
   }
 
-  private val numStateStores = 
offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)
+  private[sql] val numStateStores = 
offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)
 .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter)
 .getOrElse(sparkSession.sessionState.conf.numShufflePartitions)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index c351f65..fa7c8ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -167,7 +167,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+  ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
 
   override def output: Seq[Attribute] = joinType match {
 case _: InnerLike => left.output ++ right.output

http://git-wip-us.apache.org/repos/asf/spark/blob/507

[GitHub] spark issue #20941: [SPARK-23827] [SS] StreamingJoinExec should ensure that ...

2018-03-30 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20941
  
Started more tests to test for flakiness.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20941: [SPARK-23827] [SS] StreamingJoinExec should ensure that ...

2018-03-29 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20941
  
@brkyvz @zsxwing can one of you take a look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20941: Spark 23827

2018-03-29 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20941

Spark 23827

## What changes were proposed in this pull request?

Currently, the requiredChildDistribution does not specify the partitions. 
This can cause the weird corner cases where the child's distribution is 
`SinglePartition` which satisfies the required distribution of 
`ClusterDistribution(no-num-partition-requirement)`, thus eliminating the 
shuffle needed to repartition input data into the required number of partitions 
(i.e. same as state stores). That can lead to "file not found" errors on the 
state store delta files as the micro-batch-with-no-shuffle will not run certain 
tasks and therefore not generate the expected state store delta files.

This PR adds the required constraint on the number of partitions.

## How was this patch tested?
Modified test harness to always check that ANY stateful operator should 
have a constraint on the number of partitions. As part of that, the existing 
opt-in checks on child output partitioning were removed, as they are redundant.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23827

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20941.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20941


commit 02cc5509455d3f9d6d683a46fe4a50fcde8da348
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-29T02:38:59Z

Fixed join issue

commit 7046fbd5244e5d3adb75b7d090d57f1adc8b9859
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-29T22:32:55Z

Fix compilation

commit c162f8def7f7f57b9e8b954a5fe2f96368b5ed2f
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-29T23:22:53Z

Removed unnecessary tests




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23096][SS] Migrate rate source to V2

2018-03-27 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 35997b59f -> c68ec4e6a


[SPARK-23096][SS] Migrate rate source to V2

## What changes were proposed in this pull request?

This PR migrate micro batch rate source to V2 API and rewrite UTs to suite V2 
test.

## How was this patch tested?

UTs.

Author: jerryshao 

Closes #20688 from jerryshao/SPARK-23096.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c68ec4e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c68ec4e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c68ec4e6

Branch: refs/heads/master
Commit: c68ec4e6a1ed9ea13345c7705ea60ff4df7aec7b
Parents: 35997b5
Author: jerryshao 
Authored: Tue Mar 27 14:39:05 2018 -0700
Committer: Tathagata Das 
Committed: Tue Mar 27 14:39:05 2018 -0700

--
 apache.spark.sql.sources.DataSourceRegister |   3 +-
 .../sql/execution/datasources/DataSource.scala  |   6 +-
 .../streaming/RateSourceProvider.scala  | 262 --
 .../continuous/ContinuousRateStreamSource.scala |  25 +-
 .../sources/RateStreamMicroBatchReader.scala| 222 
 .../streaming/sources/RateStreamProvider.scala  | 125 +++
 .../streaming/sources/RateStreamSourceV2.scala  | 187 --
 .../execution/streaming/RateSourceSuite.scala   | 194 ---
 .../execution/streaming/RateSourceV2Suite.scala | 191 --
 .../sources/RateStreamProviderSuite.scala   | 344 +++
 10 files changed, 715 insertions(+), 844 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c68ec4e6/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
--
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 1fe9c09..1b37905 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
-org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
-org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

http://git-wip-us.apache.org/repos/asf/spark/blob/c68ec4e6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 31fa89b..b84ea76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -41,7 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
-import 
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
+import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, 
TextSocketSourceProvider}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
@@ -566,6 +566,7 @@ object DataSource extends Logging {
 val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
 val nativeOrc = classOf[OrcFileFormat].getCanonicalName
 val socket = classOf[TextSocketSourceProvider].getCanonicalName
+val rate = classOf[RateStreamProvider].getCanonicalName
 
 Map(
   "org.apache.spark.sql.jdbc" -> jdbc,
@@ -587,7 +588,8 @@ object DataSource extends Logging {
   "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
   "org.apache.spark.ml.source.libsvm" -> libsvm,
   "com.databricks.spark.csv" -> csv,
-  "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> 
socket
+  "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> 
socket,
+  

[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...

2018-03-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20906#discussion_r177244722
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer. " +
+  s"The input RDD has ${messages.length} partitions.")
+EpochCoordinatorRef.get(
--- End diff --

nit: Add comment on what this does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...

2018-03-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20906#discussion_r177245022
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan)
--- End diff --

Isnt it better to keep the name consistent with WriterToDataSource? Say, 
WriteToContinuousDataSourceExec?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...

2018-03-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20906#discussion_r177275489
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer. " +
+  s"The input RDD has ${messages.length} partitions.")
+EpochCoordinatorRef.get(
+  
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  sparkContext.env)
+  .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
--- End diff --

nit: this indentation looks weird.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...

2018-03-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20906#discussion_r177243246
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan)
--- End diff --

add docs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...

2018-03-26 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20906
  
jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175580404
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -160,6 +160,19 @@ object UnsupportedOperationChecker {
 case _: InsertIntoDir =>
   throwError("InsertIntoDir is not supported with streaming 
DataFrames/Datasets")
 
+case e: EventTimeWatermark =>
+  val statefulChildren = e.collect {
+case a: Aggregate if a.isStreaming => a
+case d: Deduplicate if d.isStreaming => d
+case f: FlatMapGroupsWithState if f.isStreaming => f
+  }
+  statefulChildren.foreach { statefulNode =>
+if (statefulNode.collectFirst{ case e: EventTimeWatermark => e 
}.isDefined) {
+  throwError("Watermarks both before and after a stateful 
operator in a streaming " +
--- End diff --

This gives the impression that it makes sense but we dont support it. In 
fact, its just ill-defined. May change this to something like ... Multiple 
watermarks before and after stateful operators is not well-defined in a 
streaming query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175580451
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -140,6 +140,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
 outputMode = Complete,
 expectedMsgs = Seq("distinct aggregation"))
 
+  assertNotSupportedInStreamingPlan(
+"aggregate on both sides of stateful op",
+EventTimeWatermark(
+  attribute,
+  CalendarInterval.fromString("interval 1 second"),
+  Aggregate(
+attributeWithWatermark :: Nil,
+aggExprs("a"),
+EventTimeWatermark(
+  attribute,
+  CalendarInterval.fromString("interval 2 seconds"),
+  streamRelation))),
+outputMode = Append,
+expectedMsgs = Seq("both before and after"))
--- End diff --

Add for other stateful operators as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175579879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -160,6 +160,19 @@ object UnsupportedOperationChecker {
 case _: InsertIntoDir =>
   throwError("InsertIntoDir is not supported with streaming 
DataFrames/Datasets")
 
+case e: EventTimeWatermark =>
+  val statefulChildren = e.collect {
+case a: Aggregate if a.isStreaming => a
+case d: Deduplicate if d.isStreaming => d
+case f: FlatMapGroupsWithState if f.isStreaming => f
--- End diff --

Should be for joins as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20848: [SPARK-23623][SS] Avoid concurrent use of cached ...

2018-03-19 Thread tdas
Github user tdas closed the pull request at:

https://github.com/apache/spark/pull/20848


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
Just to be clear, I am not saying that we *have to* move to this pool 
stuff. I am just saying that if we want to make this more robust, then we 
should try to use existing tools (after careful evaluation), rather than trying 
to reinvent the wheel.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
@tedyu It was indeed hard to find :) But apache commons pool does expose 
metrics on idle/active counts. See 
https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/BaseGenericObjectPool.html



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20848: [SPARK-23623][SS] Avoid concurrent use of cached ...

2018-03-16 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20848

[SPARK-23623][SS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer (branch-2.3)

This is a backport of #20767 to branch 2.3

## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain 
a pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD 
for a high-level planner to reason about the low-level execution model, whether 
there will be multiple tasks in the same query trying to read the same 
partition. Case in point, 2.3.0 introduced stream-stream joins, and you can 
build a streaming self-join query on Kafka. It's pretty non-trivial to figure 
out how this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertently lead to ConcurrentModificationException ,or worse, silent reading 
of incorrect data.

Here is a better way to design this. The planner shouldnt have to 
understand these low-level optimizations. Rather the consumer pool should be 
smart enough avoid concurrent use of a cached consumer. Currently, it tries to 
do so but incorrectly (the flag inuse is not checked when returning a cached 
consumer, see 
[this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)).
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done. Then the planner does not have to have a flag to 
avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from 
the users of the consumer so that the client code does not have to reason about 
whether to stop and release. They simply called `val consumer = 
KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is 
generated.
- If there is a concurrent attempt of the same task, then a new consumer is 
generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a 
misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch 
safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers 
for the same partition from the consumer pool.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23623-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20848.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20848


commit 9e440a4e788980e0dc475aa7966c3e56010e7cf7
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-16T18:11:07Z

[SPARK-23623][SS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain 
a pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD 
for a high-level planner to reason about the low-level execution model, whether 
there will be mu

[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
@tedyu @zsxwing My thoughts on this is that we should consider migrating to 
something like Apache Common Pool (assuming it does not require additional 
maven libraries), which might be less maintenance load. It could be that it 
already has metrics, etc. that we can leverage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-15 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
The idea is good. But how do you propose exposing that information?
Periodic print out in the log?

From a different angle, I would rather not do feature creep in this PR that
is intended to be backported to 2.3.

On Mar 15, 2018 7:31 PM, "tedyu" <notificati...@github.com> wrote:

> *@tedyu* commented on this pull request.
> --
>
> In external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/
> KafkaDataConsumer.scala
> <https://github.com/apache/spark/pull/20767#discussion_r174984237>:
>
> >CachedKafkaDataConsumer(newInternalConsumer)
>
> -} else if (existingInternalConsumer.inuse) {
> +} else if (existingInternalConsumer.inUse) {
>// If consumer is already cached but is currently in use, then 
return a new consumer
>NonCachedKafkaDataConsumer(newInternalConsumer)
>
> Maybe keep an internal counter for how many times the non cached consumer
> is created.
> This would give us information on how effective the cache is
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/20767#pullrequestreview-104439601>,
> or mute the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAoerMcXNmKmobW4ws25hx3OvcER-1Ptks5teyPogaJpZM4SiC1I>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-15 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
@tedyu @zsxwing thank you very much for catching the bugs. I have 
simplified the logic quite a bit. Note that I removed the invariant that I had 
introduced earlier. Additionally, I locally ran the stress test with 100 
threads and 1 read attempts, which ran for 2 mins. It passed. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r174973494
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
--- End diff --

I rewrote the logic. Hopefully, it's simpler to reason about it now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r174968594
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r174968294
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-15 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
@koeninger good question Cody! I think we should fix this limitation 
eventually. The only reason I am not doing that in this PR is to keep the 
changes minimum for backporting to 2.3.x. Eventually, we should not do such 
cache management, and rather use something like [Apache Common 
Pool](https://commons.apache.org/proper/commons-pool/index.html).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173602790
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173602735
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173602480
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173602442
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
+private[kafka010] sealed trait KafkaDataConsumer {
+  /**
+   * Get the record for the given offset if available. Otherwise it will 
either throw error
+   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
+   * or null.
+   *
+   * @param offset the offset to fetch.
+   * @param untilOffsetthe max offset to fetch. Exclusive.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
+   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
+   *   this method will either return record at offset 
if available, or return
+   *   the next earliest available record less than 
untilOffset, or null. It
+   *   will not throw any exception.
+   */
+  def get(
+  offset: Long,
+  untilOffset: Long,
+  pollTimeoutMs: Long,
+  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
+internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  }
+
+  /**
+   * Return the available offset range of the current partition. It's a 
pair of the earliest offset
+   * and the latest offset.
+   */
+  def getAvailableOffsetRange(): AvailableOffsetRange = 
internalConsumer.getAvailableOffsetRange()
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer
+}
+
 
 /**
- * Consumer of single topicpartition, intended for cached reuse.
- * Underlying consumer is not threadsafe, so neither is this,
- * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ * A wrapper around Kafka's KafkaConsumer that throws error when data loss 
is detected.
+ * This is not for direct use outside this file.
  */
-private[kafka010] case class CachedKafkaConsumer private(
+private[kafka010] case class InternalKafkaConsumer(
 topicPartition: TopicPartition,
 kafkaParams: ju.Map[String, Object]) extends Logging {
-  import CachedKafkaConsumer._
+  import InternalKafkaConsumer._
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private var consumer = createConsumer
+  @volatile private var consumer = createConsumer
--- End diff --

yeah, i just added them to be safer. one less thing to worry about. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173351789
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
+  private def releaseKafkaConsumer(
+topicPartition: TopicPartition,
+kafkaParams: ju.Map[String, Object]): Unit = {
+val key = new CacheKey(topicPartition, kafkaParams)
 
 synchronized {
   val consumer = cache.get(key)
   if (consumer != null) {
-consumer.inuse = false
+if (consumer.markedForClose) {
+  consumer.close()
+  cache.remove(key)
+} else {
+  consumer.inuse = false
+}
   } else {
 logWarning(s"Attempting to release consumer that does not exist")
--- End diff --

This should not be the case. We do not remove any consumer from the cache 
while it is being used. So the scenario that you mentioned should not happen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173341089
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
+  private def releaseKafkaConsumer(
+topicPartition: TopicPartition,
+kafkaParams: ju.Map[String, Object]): Unit = {
+val key = new CacheKey(topicPartition, kafkaParams)
 
 synchronized {
   val consumer = cache.get(key)
   if (consumer != null) {
-consumer.inuse = false
+if (consumer.markedForClose) {
+  consumer.close()
+  cache.remove(key)
+} else {
+  consumer.inuse = false
+}
   } else {
 logWarning(s"Attempting to release consumer that does not exist")
--- End diff --

Aah. The warning was misleading. Will add comments to clarify that. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173340037
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
+  private def releaseKafkaConsumer(
+topicPartition: TopicPartition,
+kafkaParams: ju.Map[String, Object]): Unit = {
+val key = new CacheKey(topicPartition, kafkaParams)
 
 synchronized {
   val consumer = cache.get(key)
   if (consumer != null) {
-consumer.inuse = false
+if (consumer.markedForClose) {
+  consumer.close()
+  cache.remove(key)
+} else {
+  consumer.inuse = false
+}
   } else {
 logWarning(s"Attempting to release consumer that does not exist")
   }
 }
   }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
-  }
-}
-  }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newNonCachedConsumer =
+  new NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
-  cache.put(key, consumer)
-  consumer
-} else {
-  if (!cache.containsKey(key)) {
-cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
+  // If this is reattempt at running the task, then invalidate cache 
and start with
+  // a new consumer
+  if (existingInternalConsumer != null) {
--- End diff --

This is indeed better. What I was doing was always deferring to a later 
point. But that would lead to it being used one more time before being closed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173338056
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
+  private def releaseKafkaConsumer(
+topicPartition: TopicPartition,
+kafkaParams: ju.Map[String, Object]): Unit = {
+val key = new CacheKey(topicPartition, kafkaParams)
 
 synchronized {
   val consumer = cache.get(key)
   if (consumer != null) {
-consumer.inuse = false
+if (consumer.markedForClose) {
+  consumer.close()
+  cache.remove(key)
+} else {
+  consumer.inuse = false
+}
   } else {
 logWarning(s"Attempting to release consumer that does not exist")
   }
 }
   }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
-  }
-}
-  }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newNonCachedConsumer =
+  new NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
-  cache.put(key, consumer)
-  consumer
-} else {
-  if (!cache.containsKey(key)) {
-cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
+  // If this is reattempt at running the task, then invalidate cache 
and start with
+  // a new consumer
+  if (existingInternalConsumer != null) {
+existingInternalConsumer.markedForClose = true
   }
-  val consumer = cache.get(key)
-  consumer.inuse = true
-  consumer
+  newNonCachedConsumer
+} else if (!useCache) {
+  newNonCachedConsumer
+} else if (existingInternalConsumer == null) {
+  newNonCachedConsumer.internalConsumer.inuse = true
+  cache.put(key, newNonCachedConsumer.internalConsumer)
+  newNonCachedConsumer
--- End diff --

oh yes. damn it. my bad.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-08 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
jenkins retest this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20765: [SPARK-23406][SS] Enable stream-stream self-joins...

2018-03-08 Thread tdas
Github user tdas closed the pull request at:

https://github.com/apache/spark/pull/20765


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

2018-03-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1dd37ff3b -> 404f7e201


[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

This is a backport of #20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary 
for the catalyst analyzer to convert the self-join logical plan DAG into a tree 
(by creating new instances of the leaf relations). This was causing the error 
`Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the 
StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
```
val df = input.toDF
val join =
  df.select('value % 5 as "key", 'value).join(
df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, 
the new output attributes (value#66) replace the earlier output attributes 
(value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]   // both value#1 and value#12 replaces 
by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
  +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
  +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   : +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- Project [value#66 AS value#12]// solution: project with aliases
 +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #20765 from tdas/SPARK-23406-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/404f7e20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/404f7e20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/404f7e20

Branch: refs/heads/branch-2.3
Commit: 404f7e2013ecfdf993a17fd942d8890d9a8100e7
Parents: 1dd37ff
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Mar 7 21:58:57 2018 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Mar 7 21:58:57 2018 -0800

--
 .../streaming/MicroBatchExecution.scala | 16 ++---
 .../execution/streaming/StreamingRelation.scala | 20 +++-
 .../sql/streaming/StreamingJoinSuite.scala  | 25 +++-
 3 files changed, 45 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/404f7e20/sql/core/src/main/scala/org/apache/spar

[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-07 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20767
  
@zsxwing @brkyvz PTAL.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: Fixed

2018-03-07 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20767

Fixed

## What changes were proposed in this pull request?

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain 
a pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one task using trying to read the same Kafka 
TopicPartition at the same time. Hence, the cache was keyed by the 
TopicPartition a consumer is supposed to read. And any cases where this 
assumption may not be true, we have SparkPlan flag to disable the use of a 
cache. So it was up to the planner to correctly identify when it was not safe 
to use the cache and set the flag accordingly. 

Fundamentally, this is the wrong way to approach the problem. It is HARD 
for a high-level planner to reason about the low-level execution model, whether 
there will be multiple tasks in the same query trying to read the same 
partition. Case in point, 2.3.0 introduced stream-stream joins, and you can 
build a streaming self-join query on Kafka. It's pretty non-trivial to figure 
out how this leads to two tasks reading the same partition twice, possibly 
concurrently. And due to the non-triviality, it is hard to figure this out in 
the planner and set the flag to avoid the cache / consumer pool. And this can 
inadvertently lead to ConcurrentModificationException ,or worse, silent reading 
of incorrect data.

Here is a better way to design this. The planner shouldnt have to 
understand these low-level optimizations. Rather the consumer pool should be 
smart enough avoid concurrent use of a cached consumer. Currently, it tries to 
do so but incorrectly (the flag inuse is not checked when returning a cached 
consumer, see 
[this|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403).]
 If there is another request for the same partition as a currently in-use 
consumer, the pool should automatically return a fresh consumer that should be 
closed when the task is done. Then the planner does not have to have a flag to 
avoid reuses.

This PR is a step towards that goal. It does the following. 
- There are effectively two kinds of consumer that may be generated 
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from 
the users of the consumer so that the client code does not have to reason about 
whether to stop and release. They simply called `val consumer = 
KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is 
generated.
- If there is a concurrent attempt of the same task, then a new consumer is 
generated, and the existing cached consumer is marked for close upon release. 

This PR does not remove the planner flag to avoid reuse to make this patch 
safe enough for merging in branch-2.3. This can be done later in master-only.



## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23623

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20767.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20767


commit 97510c6952a865caf41e6b6f19c3af7e714c3ad6
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-08T02:23:45Z

Fixed




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20765: [SPARK-23406][SS] Enable stream-stream self-joins...

2018-03-07 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20765

[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

This is a backport of #20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is 
necessary for the catalyst analyzer to convert the self-join logical plan DAG 
into a tree (by creating new instances of the leaf relations). This was causing 
the error `Failure when resolving conflicting references in Join:` (see JIRA 
for details).

### Incorrect attribute rewrite when splicing batch plans in 
MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing 
the StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
```
val df = input.toDF
val join =
  df.select('value % 5 as "key", 'value).join(
df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for 
spliced, the new output attributes (value#66) replace the earlier output 
attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]   // both value#1 and value#12 
replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
  +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
  +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   : +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- Project [value#66 AS value#12]// solution: project with aliases
 +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23406-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20765


commit c3ec9ef9355a3290d764dda0191165eaa4e49062
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-02-14T22:27:02Z

[SPARK-23406][SS] Enable stream-stream self-joins

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is 
necessary for the cataly

[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2

2018-03-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20688#discussion_r172730994
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 ---
@@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.streaming.{RateSourceProvider, 
RateStreamOffset, ValueRunTimeMsPair}
-import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
+import org.apache.spark.sql.execution.streaming.{RateStreamOffset, 
ValueRunTimeMsPair}
+import org.apache.spark.sql.execution.streaming.sources.RateSourceProvider
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
--- End diff --

Could you make the names of the different readers consistent with each 
other? Similar to Kafka?

RateStreamProvider
RateStreamMicroBatchReader, RateStreamMicroBatchDataReaderFactory 
RateStreamContinuousReader, 





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2

2018-03-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20688#discussion_r172730333
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateSourceSuite.scala
 ---
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.nio.file.Files
+import java.util.Optional
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.util.ManualClock
+
+class RateSourceSuite extends StreamTest {
--- End diff --

Why did you not move this file using "git mv" and then change? Then we 
would have been able to diff it properly. 
This was a pain in the text socket v2 PR as well :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20688: [SPARK-23096][SS] Migrate rate source to V2

2018-03-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20688#discussion_r172729894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateSourceProvider.scala
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.Optional
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
MicroBatchReader, Offset}
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.{ManualClock, SystemClock}
+
+object RateSourceProvider {
+  val SCHEMA =
+StructType(StructField("timestamp", TimestampType) :: 
StructField("value", LongType) :: Nil)
+
+  val VERSION = 1
+
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_SECOND = "rowsPerSecond"
+  val RAMP_UP_TIME = "rampUpTime"
+
+  /** Calculate the end value we will emit at the time `seconds`. */
+  def valueAtSecond(seconds: Long, rowsPerSecond: Long, rampUpTimeSeconds: 
Long): Long = {
+// E.g., rampUpTimeSeconds = 4, rowsPerSecond = 10
+// Then speedDeltaPerSecond = 2
+//
+// seconds   = 0 1 2  3  4  5  6
+// speed = 0 2 4  6  8 10 10 (speedDeltaPerSecond * seconds)
+// end value = 0 2 6 12 20 30 40 (0 + speedDeltaPerSecond * seconds) * 
(seconds + 1) / 2
+val speedDeltaPerSecond = rowsPerSecond / (rampUpTimeSeconds + 1)
+if (seconds <= rampUpTimeSeconds) {
+  // Calculate "(0 + speedDeltaPerSecond * seconds) * (seconds + 1) / 
2" in a special way to
+  // avoid overflow
+  if (seconds % 2 == 1) {
+(seconds + 1) / 2 * speedDeltaPerSecond * seconds
+  } else {
+seconds / 2 * speedDeltaPerSecond * (seconds + 1)
+  }
+} else {
+  // rampUpPart is just a special case of the above formula: 
rampUpTimeSeconds == seconds
+  val rampUpPart = valueAtSecond(rampUpTimeSeconds, rowsPerSecond, 
rampUpTimeSeconds)
+  rampUpPart + (seconds - rampUpTimeSeconds) * rowsPerSecond
+}
+  }
+}
+
+class RateSourceProvider extends DataSourceV2
+  with MicroBatchReadSupport with ContinuousReadSupport with 
DataSourceRegister {
+  import RateSourceProvider._
+
+  private def checkParameters(options: DataSourceOptions): Unit = {
+if (options.get(ROWS_PER_SECOND).isPresent) {
+  val rowsPerSecond = options.get(ROWS_PER_SECOND).get().toLong
+  if (rowsPerSecond <= 0) {
+throw new IllegalArgumentException(
+  s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' 
must be positive")
+  }
+}
+
+if (options.get(RAMP_UP_TIME).isPresent) {
+  val rampUpTimeSeconds =
+JavaUtils.timeStringAsSec(options.get(RAMP_UP_TIME).get())
+  if (rampUpTimeSeconds < 0) {
+throw new IllegalArgumentException(
+  s"Invalid value '$rampUpTimeSeconds'. The option 'rampUpTime' 
must not be negative")
   

[GitHub] spark pull request #20755: [SPARK-23406][SS] Enable stream-stream self-joins...

2018-03-06 Thread tdas
Github user tdas closed the pull request at:

https://github.com/apache/spark/pull/20755


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20755: [SPARK-23406][SS] Enable stream-stream self-joins...

2018-03-06 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20755

[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

## What changes were proposed in this pull request?
This is limited but safe-to-backport version of self-join-fix made in 
#20598 
That PR solved two bugs
1. Add MultiInstanceRelation trait to leaf logical nodes to allow 
resolution - This is the major fix required to allow streaming self-joins, and 
is safe to backport.
2. Fix attribute rewriting in MicroBatchExecution when micro-batch plans 
are spliced into the streaming logical plan - This is a minor fix that is not 
safe to backport. Without this fix only a very small fraction self-join cases 
will have issues, but those issues may lead to incorrect results.

## How was this patch tested?
New unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23406-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20755


commit 484babb58d9cf61d5dcc6521865cd2a5db64dd82
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-07T00:53:34Z

Fixed




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20710
  
@rdblue @jose-torres arrgh... i didnt notice that you guys were still 
commenting before i merged it. 
feel free to continue discussion and if any change is needed we will deal 
with this accordingly. sorry about it!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

2018-03-05 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master ba622f45c -> b0f422c38


[SPARK-23559][SS] Add epoch ID to DataWriterFactory.

## What changes were proposed in this pull request?

Add an epoch ID argument to DataWriterFactory for use in streaming. As a side 
effect of passing in this value, DataWriter will now have a consistent 
lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in 
any execution mode.

I considered making a separate streaming interface and adding the epoch ID only 
to that one, but I think it requires a lot of extra work for no real gain. I 
think it makes sense to define epoch 0 as the one and only epoch of a 
non-streaming query.

## How was this patch tested?

existing unit tests

Author: Jose Torres 

Closes #20710 from jose-torres/api2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f422c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f422c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f422c3

Branch: refs/heads/master
Commit: b0f422c3861a5a3831e481b8ffac08f6fa085d00
Parents: ba622f4
Author: Jose Torres 
Authored: Mon Mar 5 13:23:01 2018 -0800
Committer: Tathagata Das 
Committed: Mon Mar 5 13:23:01 2018 -0800

--
 .../spark/sql/kafka010/KafkaStreamWriter.scala  |  5 +++-
 .../spark/sql/sources/v2/writer/DataWriter.java | 12 ++
 .../sources/v2/writer/DataWriterFactory.java|  5 +++-
 .../v2/writer/streaming/StreamWriter.java   | 19 +++
 .../datasources/v2/WriteToDataSourceV2.scala| 25 +---
 .../streaming/MicroBatchExecution.scala |  7 ++
 .../sources/PackedRowWriterFactory.scala|  5 +++-
 .../execution/streaming/sources/memoryV2.scala  |  5 +++-
 .../sources/v2/SimpleWritableDataSource.scala   | 10 ++--
 9 files changed, 65 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index 9307bfc..ae5b5c5 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
 topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
   extends DataWriterFactory[InternalRow] {
 
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+  override def createDataWriter(
+  partitionId: Int,
+  attemptNumber: Int,
+  epochId: Long): DataWriter[InternalRow] = {
 new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 53941a8..39bf458 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data writer returned by {@link DataWriterFactory#createDataWriter(int, 
int)} and is
+ * A data writer returned by {@link DataWriterFactory#createDataWriter(int, 
int, long)} and is
  * responsible for writing data for an input RDD partition.
  *
  * One Spark task has one exclusive data writer, so there is no thread-safe 
concern.
@@ -31,13 +31,17 @@ import org.apache.spark.annotation.InterfaceStability;
  * the {@link #write(Object)}, {@link #abort()} is called afterwards and the 
remaining records will
  * not be processed. If all records are successfully written, {@link 
#commit()} is called.
  *
+ * Once a data writer returns successfully from {@link #commit()} or {@link 
#abort()}, its lifecycle
+ * is over and Spark will not use it again.
+ *
  * If this data writer succeeds(all records are successfully written and 
{@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side 
and pass to
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit 

[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r172081660
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -31,13 +31,17 @@
  * the {@link #write(Object)}, {@link #abort()} is called afterwards and 
the remaining records will
  * not be processed. If all records are successfully written, {@link 
#commit()} is called.
  *
+ * Once a data writer returns successfully from {@link #commit()} or 
{@link #abort()}, its lifecycle
+ * is over and Spark will not use it again.
+ *
  * If this data writer succeeds(all records are successfully written and 
{@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver 
side and pass to
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
- * exception will be sent to the driver side, and Spark will retry this 
writing task for some times,
- * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a 
different `attemptNumber`,
- * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} 
if all retry fail.
+ * exception will be sent to the driver side, and Spark may retry this 
writing task a few times.
+ * In each retry, {@link DataWriterFactory#createDataWriter(int, int, 
long)} will receive a
+ * different `attemptNumber`. Spark will call {@link 
DataSourceWriter#abort(WriterCommitMessage[])}
--- End diff --

This is not clear to me. Isnt it the case that abort will be called every 
time a task attempt ends in an error?
This seems to give the impression that abort is called only after N failed 
attempts have been made. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...

2018-03-02 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20698
  
Thank you. Merging to master only as this is a new feature touching 
production code paths.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993983
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 ---
@@ -39,21 +36,21 @@
* If this method fails (by throwing an exception), this writing job is 
considered to have been
* failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
*
-   * To support exactly-once processing, writer implementations should 
ensure that this method is
-   * idempotent. The execution engine may call commit() multiple times for 
the same epoch
-   * in some circumstances.
+   * The execution engine may call commit() multiple times for the same 
epoch in some circumstances.
--- End diff --

Somewhere in this file, add docs about what epochId means for MB and C 
execution. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993716
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -48,6 +48,9 @@
*  same task id but different attempt number, which 
means there are multiple
*  tasks with the same task id running at the same 
time. Implementations can
*  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For queries that 
execute as a single batch, this
--- End diff --

Also, make it clear that, this is batchId for MicroBatch processing and 
epochId for Continuous processing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993622
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -48,6 +48,9 @@
*  same task id but different attempt number, which 
means there are multiple
*  tasks with the same task id running at the same 
time. Implementations can
*  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For queries that 
execute as a single batch, this
--- End diff --

For non-streaming queries, this...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993559
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -36,8 +36,9 @@
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark will retry this 
writing task for some times,
--- End diff --

Break this sentence. very long.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993519
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -36,8 +36,9 @@
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark will retry this 
writing task for some times,
--- End diff --

for some times --> for a few times


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993467
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -36,8 +36,9 @@
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark will retry this 
writing task for some times,
--- End diff --

Spark may retry... (in continuous we dont retry the task)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -172,17 +173,19 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
 val epochCoordinator = EpochCoordinatorRef.get(
   
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
   SparkEnv.get)
 val currentMsg: WriterCommitMessage = null
 var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
 
 do {
+  var dataWriter: DataWriter[InternalRow] = null
   // write the data and commit this writer.
   Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
 try {
+  dataWriter = writeTask.createDataWriter(
+context.partitionId(), context.attemptNumber(), currentEpoch)
   iter.foreach(dataWriter.write)
--- End diff --

fix this! dont use foreach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -198,7 +201,7 @@ object DataWritingSparkTask extends Logging {
   })(catchBlock = {
 // If there is an error, abort this writer
 logError(s"Writer for partition ${context.partitionId()} is 
aborting.")
-dataWriter.abort()
+if (dataWriter != null) dataWriter.abort()
 logError(s"Writer for partition ${context.partitionId()} aborted.")
--- End diff --

nit: add comment that the exception will be rethrown.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20710: [SPARK-23559][SS] Add epoch ID to DataWriterFacto...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20710#discussion_r171993066
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -48,6 +48,9 @@
*  same task id but different attempt number, which 
means there are multiple
*  tasks with the same task id running at the same 
time. Implementations can
*  use this attempt number to distinguish writers 
of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries 
that are split in to
+   *discrete periods of execution. For queries that 
execute as a single batch, this
+   *id will always be zero.
*/
-  DataWriter createDataWriter(int partitionId, int attemptNumber);
+  DataWriter createDataWriter(int partitionId, int attemptNumber, long 
epochId);
--- End diff --

Add clear lifecycle semantics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171989658
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+  require(minPartitions.isEmpty || minPartitions.get > 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `minPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}.filter(_.size > 0)
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // preferentially read from the same executor and the KafkaConsumer 
can be reused.
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  val idealRangeSize = totalSize.toDouble / minPartitions.get
+
+  offsetRanges.flatMap { range =>
+// Split the current range into subranges as close to the ideal 
range size
+val rangeSize = range.untilOffset - range.fromOffset
+val numSplitsInRange = math.round(rangeSize.toDouble / 
idealRangeSize).toInt
+
+(0 until numSplitsInRange).map { i =>
+  val splitStart = range.fromOffset + rangeSize * (i.toDouble / 
numSplitsInRange)
+  val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) 
/ numSplitsInRange)
+  KafkaOffsetRange(
+range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.hashCode, numExecutors)))
+} else None
+  }
+}
+
+private[kafka010] object KafkaOffsetR

spark git commit: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-03-02 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 3a4d15e5d -> 707e6506d


[SPARK-23097][SQL][SS] Migrate text socket source to V2

## What changes were proposed in this pull request?

This PR moves structured streaming text socket source to V2.

Questions: do we need to remove old "socket" source?

## How was this patch tested?

Unit test and manual verification.

Author: jerryshao 

Closes #20382 from jerryshao/SPARK-23097.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/707e6506
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/707e6506
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/707e6506

Branch: refs/heads/master
Commit: 707e6506d0dbdb598a6c99d666f3c66746113b67
Parents: 3a4d15e
Author: jerryshao 
Authored: Fri Mar 2 12:27:42 2018 -0800
Committer: Tathagata Das 
Committed: Fri Mar 2 12:27:42 2018 -0800

--
 apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   5 +-
 .../spark/sql/execution/streaming/socket.scala  | 219 -
 .../execution/streaming/sources/socket.scala| 255 
 .../spark/sql/streaming/DataStreamReader.scala  |  21 +-
 .../streaming/TextSocketStreamSuite.scala   | 231 --
 .../sources/TextSocketStreamSuite.scala | 306 +++
 7 files changed, 582 insertions(+), 457 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
--
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 0259c77..1fe9c09 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -5,6 +5,6 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
-org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
 org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6e1b572..35fcff6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
@@ -563,6 +564,7 @@ object DataSource extends Logging {
 val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
 val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
 val nativeOrc = classOf[OrcFileFormat].getCanonicalName
+val socket = classOf[TextSocketSourceProvider].getCanonicalName
 
 Map(
   "org.apache.spark.sql.jdbc" -> jdbc,
@@ -583,7 +585,8 @@ object DataSource extends Logging {
   "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
   "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
   "org.apache.spark.ml.source.libsvm" -> libsvm,
-  "com.databricks.spark.csv" -> csv
+  "com.databricks.spark.csv" -> csv,
+  "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> 
socket
 )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/707e6506/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
--
diff 

[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-03-02 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20382
  
LGTM. Merging to master. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171954250
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750758
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
--- End diff --

I rewrote this completely using the code used by from 
`sparkContext.parallelize` to make splits. 

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L123


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750580
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
--- End diff --

`fromOffsets` here will contain the initial offsets of new partitions. See 
the how fromOffsets is set with `startOffsets + newPartitionInitialOffsets`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171750437
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
--- End diff --

Rewritten. I dont want to rely on this default value of 0, as @jose-torres 
expressed concern earlier. So i rewrote this to explicitly check whether 
minPartitions have been set or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171741765
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+/**
+ * Class to calculate offset ranges to process based on the the from and 
until offsets, and
+ * the configured `minPartitions`.
+ */
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
+  require(minPartitions >= 0)
+
+  import KafkaOffsetRangeCalculator._
+  /**
+   * Calculate the offset ranges that we are going to process this batch. 
If `numPartitions`
+   * is not set or is set less than or equal the number of 
`topicPartitions` that we're going to
+   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * `numPartitions` is set higher than the number of our 
`topicPartitions`, then we will split up
+   * the read tasks of the skewed partitions to multiple Spark tasks.
+   * The number of Spark tasks will be *approximately* `numPartitions`. It 
can be less or more
+   * depending on rounding errors or Kafka partitions that didn't receive 
any new data.
+   */
+  def getRanges(
+  fromOffsets: PartitionOffsetMap,
+  untilOffsets: PartitionOffsetMap,
+  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = 
{
+val partitionsToRead = 
untilOffsets.keySet.intersect(fromOffsets.keySet)
+
+val offsetRanges = partitionsToRead.toSeq.map { tp =>
+  KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc 
= None)
+}
+
+// If minPartitions not set or there are enough partitions to satisfy 
minPartitions
+if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > 
minPartitions) {
+  // Assign preferred executor locations to each range such that the 
same topic-partition is
+  // always read from the same executor and the KafkaConsumer can be 
reused
+  offsetRanges.map { range =>
+range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
+  }
+} else {
+
+  // Splits offset ranges with relatively large amount of data to 
smaller ones.
+  val totalSize = offsetRanges.map(o => o.untilOffset - 
o.fromOffset).sum
+  offsetRanges.flatMap { offsetRange =>
+val tp = offsetRange.topicPartition
+val size = offsetRange.untilOffset - offsetRange.fromOffset
+// number of partitions to divvy up this topic partition to
+val parts = math.max(math.round(size * 1.0 / totalSize * 
minPartitions), 1).toInt
+var remaining = size
+var startOffset = offsetRange.fromOffset
+(0 until parts).map { part =>
+  // Fine to do integer division. Last partition will consume all 
the round off errors
+  val thisPartition = remaining / (parts - part)
+  remaining -= thisPartition
+  val endOffset = startOffset + thisPartition
+  val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, 
preferredLoc = None)
+  startOffset = endOffset
+  offsetRange
+}
+  }
+}
+  }
+
+  private def getLocation(tp: TopicPartition, executorLocations: 
Seq[String]): Option[String] = {
+def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
+
+val numExecutors = executorLocations.length
+if (numExecutors > 0) {
+  // This allows cached KafkaConsumers in the executors to be re-used 
to read the same
+  // partition in every batch.
+  Some(executorLocations(floorMod(tp.ha

[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171732015
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader(
 // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
 // Otherwise, interrupting a thread while running `KafkaConsumer.poll` 
may hang forever
 // (KAFKA-1894).
-assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+require(Thread.currentThread().isInstanceOf[UninterruptibleThread])
--- End diff --

not much really. assert throws Assertions and require throws 
IllegalArgumentException. Just a matter of preference. I can revert this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20703: [SPARK-19185][SS] Make Kafka consumer cache configurable

2018-03-01 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20703
  
I completely agree with @zsxwing, let understand what the issue is rather 
than covering it up with a workaround. We should not run into such issue at 
all. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-03-01 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20382
  
relevant test failed. please make sure that there is no flakiness in the 
tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171510614
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import

[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2

2018-02-28 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20382
  
@jerryshao please address the above comment, then we are good to merge!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20698: [SPARK-23541][SS] Allow Kafka source to read data with g...

2018-02-28 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20698
  
@zsxwing @jose-torres 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20698#discussion_r171441133
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+
+
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) 
{
--- End diff --

add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20698: [SPARK-23541][SS] Allow Kafka source to read data...

2018-02-28 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20698

[SPARK-23541][SS] Allow Kafka source to read data with greater parallelism 
than the number of topic-partitions

## What changes were proposed in this pull request?

Currently, when the Kafka source reads from Kafka, it generates as many 
tasks as the number of partitions in the topic(s) to be read. In some case, it 
may be beneficial to read the data with greater parallelism, that is, with more 
number partitions/tasks. That means, offset ranges must be divided up into 
smaller ranges such the number of records in partition ~= total records in 
batch / desired partitions. This would also balance out any data skews between 
topic-partitions.

In this patch, I have added a new option called `minPartitions`, which 
allows the user to specify the desired level of parallelism.

## How was this patch tested?
New tests in KafkaMicroBatchV2SourceSuite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20698.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20698


commit ebb9b51c51a4411811a7e0e09fff8f8608faa017
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-03-01T01:28:32Z

Implemented




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171226477
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -61,13 +68,13 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
* Stored in a ListBuffer to facilitate removing committed batches.
*/
   @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
+  private val batches = new ListBuffer[(String, Timestamp)]
 
   @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
+  private[sources] var currentOffset: LongOffset = LongOffset(-1L)
--- End diff --

this does not make sene. you are directly accessing something that should 
be accessed while synchronized on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171225853
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import

<    1   2   3   4   5   6   7   8   9   10   >