[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20382


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171954250
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThread.po

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-01 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171511528
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThre

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171510614
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThread.po

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-03-01 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171506698
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThre

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171469866
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThre

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171226477
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -61,13 +68,13 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
* Stored in a ListBuffer to facilitate removing committed batches.
*/
   @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
+  private val batches = new ListBuffer[(String, Timestamp)]
 
   @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
+  private[sources] var currentOffset: LongOffset = LongOffset(-1L)
--- End diff --

this does not make sene. you are directly accessing something that should 
be accessed while synchronized on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171225853
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThread.po

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-28 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r171225732
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.nio.ByteBuffer
+import java.nio.channels.ServerSocketChannel
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  case class AddSocketData(data: String*) extends AddData {
+override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
+  require(
+query.nonEmpty,
+"Cannot add data when there is no query for finding the active 
socket source")
+
+  val sources = query.get.logicalPlan.collect {
+case StreamingExecutionRelation(source: 
TextSocketMicroBatchReader, _) => source
+  }
+  if (sources.isEmpty) {
+throw new Exception(
+  "Could not find socket source in the StreamExecution logical 
plan to add data to")
+  } else if (sources.size > 1) {
+throw new Exception(
+  "Could not select the socket source in the StreamExecution 
logical plan as there" +
+"are multiple socket sources:\n\t" + sources.mkString("\n\t"))
+  }
+  val socketSource = sources.head
+
+  assert(serverThread != null && serverThread.port != 0)
+  val currOffset = socketSource.currentOffset
+  data.foreach(serverThread.enqueue)
+
+  val newOffset = LongOffset(currOffset.offset + data.size)
+  (socketSource, newOffset)
+}
+
+override def toString: String = s"AddSocketData(data = $data)"
+  }
+
+  test("backward compatibility with old path") {
+
DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
+  spark.sqlContext.conf).newInstance() match {
+  case ds: MicroBatchReadSupport =>
+assert(ds.isInstanceOf[TextSocketSourceProvider])
+  case _ =>
+throw new IllegalStateException("Could not find socket source")
+}
+  }
+
+  test("basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+  val ref = spark
+  import ref.implicits._
+
+  val socket = spark
+.readStream
+.format("socket")
+.options(Map("host" -> "localhost", "port" -> 
serverThread.po

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r170759995
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  test("V2 basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val provider = new TextSocketSourceProvider
+val options = new DataSourceOptions(
+  Map("host" -> "localhost", "port" -> 
serverThread.port.toString).asJava)
+batchReader = provider.createMicroBatchReader(Optional.empty(), "", 
options)
+
+val schema = batchReader.readSchema()
+assert(schema === StructType(StructField("value", StringType) :: Nil))
+
+failAfter(streamingTimeout) {
+  serverThread.enqueue("hello")
+  batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+  while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == 
-1L) {
+batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+Thread.sleep(10)
+  }
+  withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+val offset1 = batchReader.getEndOffset
+val batch1 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch1.append(r.get())
+  }
+}
+assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
+
+serverThread.enqueue("world")
+while (batchReader.getEndOffset === offset1) {
+  batchReader.setOffsetRange(Optional.of(offset1), 
Optional.empty())
+  Thread.sleep(10)
+}
+val offset2 = batchReader.getEndOffset
+val batch2 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch2.append(r.get())
+  }
+}
+assert(batch2.map(_.getAs[String](0)) === Seq("world"))
+
+batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
+val both = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+both.append(r.get())
+  }
+}
+assert(

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-22 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r170178735
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  test("V2 basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val provider = new TextSocketSourceProvider
+val options = new DataSourceOptions(
+  Map("host" -> "localhost", "port" -> 
serverThread.port.toString).asJava)
+batchReader = provider.createMicroBatchReader(Optional.empty(), "", 
options)
+
+val schema = batchReader.readSchema()
+assert(schema === StructType(StructField("value", StringType) :: Nil))
+
+failAfter(streamingTimeout) {
+  serverThread.enqueue("hello")
+  batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+  while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == 
-1L) {
+batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+Thread.sleep(10)
+  }
+  withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+val offset1 = batchReader.getEndOffset
+val batch1 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch1.append(r.get())
+  }
+}
+assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
+
+serverThread.enqueue("world")
+while (batchReader.getEndOffset === offset1) {
+  batchReader.setOffsetRange(Optional.of(offset1), 
Optional.empty())
+  Thread.sleep(10)
+}
+val offset2 = batchReader.getEndOffset
+val batch2 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch2.append(r.get())
+  }
+}
+assert(batch2.map(_.getAs[String](0)) === Seq("world"))
+
+batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
+val both = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+both.append(r.get())
+  }
+}
+as

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r168363424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -583,7 +585,8 @@ object DataSource extends Logging {
   "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
   "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
   "org.apache.spark.ml.source.libsvm" -> libsvm,
-  "com.databricks.spark.csv" -> csv
+  "com.databricks.spark.csv" -> csv,
+  "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" 
-> socket
--- End diff --

please add a test for this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r168138470
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  test("V2 basic usage") {
--- End diff --

These updated tests are getting more complicated with the direct calling of 
low-level data source APIs. Can you convert these tests to the more highlevel 
tests like Kafka? 
Well if it gets too complicated to make it work with `testStream` then you 
can simply use `query.processAllAvailable`. Then we wont have to worry about 
changing APIs any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r168137288
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{IOException, OutputStreamWriter}
+import java.net.ServerSocket
+import java.sql.Timestamp
+import java.util.Optional
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+class TextSocketStreamSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfterEach {
+
+  override def afterEach() {
+sqlContext.streams.active.foreach(_.stop())
+if (serverThread != null) {
+  serverThread.interrupt()
+  serverThread.join()
+  serverThread = null
+}
+if (batchReader != null) {
+  batchReader.stop()
+  batchReader = null
+}
+  }
+
+  private var serverThread: ServerThread = null
+  private var batchReader: MicroBatchReader = null
+
+  test("V2 basic usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val provider = new TextSocketSourceProvider
+val options = new DataSourceOptions(
+  Map("host" -> "localhost", "port" -> 
serverThread.port.toString).asJava)
+batchReader = provider.createMicroBatchReader(Optional.empty(), "", 
options)
+
+val schema = batchReader.readSchema()
+assert(schema === StructType(StructField("value", StringType) :: Nil))
+
+failAfter(streamingTimeout) {
+  serverThread.enqueue("hello")
+  batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+  while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == 
-1L) {
+batchReader.setOffsetRange(Optional.empty(), Optional.empty())
+Thread.sleep(10)
+  }
+  withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+val offset1 = batchReader.getEndOffset
+val batch1 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch1.append(r.get())
+  }
+}
+assert(batch1.map(_.getAs[String](0)) === Seq("hello"))
+
+serverThread.enqueue("world")
+while (batchReader.getEndOffset === offset1) {
+  batchReader.setOffsetRange(Optional.of(offset1), 
Optional.empty())
+  Thread.sleep(10)
+}
+val offset2 = batchReader.getEndOffset
+val batch2 = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+batch2.append(r.get())
+  }
+}
+assert(batch2.map(_.getAs[String](0)) === Seq("world"))
+
+batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2))
+val both = new ListBuffer[Row]
+
batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach
 { r =>
+  while (r.next()) {
+both.append(r.get())
+  }
+}
+assert(

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167776323
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
--- End diff --

Sorry @tdas , I did it by simply "mv", not "git mv". This doesn't change a 
lot, just to be suited for data source v2 API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167713045
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -15,40 +15,48 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming
+package org.apache.spark.sql.execution.streaming.sources
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
 import java.net.Socket
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.{Calendar, List => JList, Locale, Optional}
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
-import org.apache.spark.unsafe.types.UTF8String
 
-
-object TextSocketSource {
+object TextSocketMicroBatchReader {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
 StructField("timestamp", TimestampType) :: Nil)
   val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
 }
 
 /**
- * A source that reads text lines through a TCP socket, designed only for 
tutorials and debugging.
- * This source will *not* work in production applications due to multiple 
reasons, including no
- * support for fault recovery and keeping all of the text read in memory 
forever.
+ * A MicroBatchReader that reads text lines through a TCP socket, designed 
only for tutorials and
--- End diff --

Tutorials is correct here; see e.g. StructuredSessionization.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167126569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -15,40 +15,48 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming
+package org.apache.spark.sql.execution.streaming.sources
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
 import java.net.Socket
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.{Calendar, List => JList, Locale, Optional}
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
-import org.apache.spark.unsafe.types.UTF8String
 
-
-object TextSocketSource {
+object TextSocketMicroBatchReader {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
 StructField("timestamp", TimestampType) :: Nil)
   val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
 }
 
 /**
- * A source that reads text lines through a TCP socket, designed only for 
tutorials and debugging.
- * This source will *not* work in production applications due to multiple 
reasons, including no
- * support for fault recovery and keeping all of the text read in memory 
forever.
+ * A MicroBatchReader that reads text lines through a TCP socket, designed 
only for tutorials and
--- End diff --

nit: tutorials -> testing (i know it was like that, but lets fix it since 
we are changing it anyway)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167128485
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 }
   }
 
-  override def toString: String = s"TextSocketSource[host: $host, port: 
$port]"
+  override def toString: String = s"TextSocketMicroBatchReader[host: 
$host, port: $port]"
 }
 
-class TextSocketSourceProvider extends StreamSourceProvider with 
DataSourceRegister with Logging {
-  private def parseIncludeTimestamp(params: Map[String, String]): Boolean 
= {
-Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {
-  case Success(bool) => bool
-  case Failure(_) =>
-throw new AnalysisException("includeTimestamp must be set to 
either \"true\" or \"false\"")
-}
-  }
+class TextSocketSourceProvider extends DataSourceV2
+  with MicroBatchReadSupport with DataSourceRegister with Logging {
 
-  /** Returns the name and schema of the source that can be used to 
continually read data. */
-  override def sourceSchema(
-  sqlContext: SQLContext,
-  schema: Option[StructType],
-  providerName: String,
-  parameters: Map[String, String]): (String, StructType) = {
+  private def checkParameters(params: Map[String, String]): Unit = {
 logWarning("The socket source should not be used for production 
applications! " +
   "It does not support recovery.")
-if (!parameters.contains("host")) {
+if (!params.contains("host")) {
   throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
 }
-if (!parameters.contains("port")) {
+if (!params.contains("port")) {
   throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
 }
-if (schema.nonEmpty) {
-  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+Try {
+  params.get("includeTimestamp")
+.orElse(params.get("includetimestamp"))
+.getOrElse("false")
+.toBoolean
+} match {
+  case Success(_) =>
+  case Failure(_) =>
+throw new AnalysisException("includeTimestamp must be set to 
either \"true\" or \"false\"")
 }
-
-val sourceSchema =
-  if (parseIncludeTimestamp(parameters)) {
-TextSocketSource.SCHEMA_TIMESTAMP
-  } else {
-TextSocketSource.SCHEMA_REGULAR
-  }
-("textSocket", sourceSchema)
   }
 
-  override def createSource(
-  sqlContext: SQLContext,
-  metadataPath: String,
-  schema: Option[StructType],
-  providerName: String,
-  parameters: Map[String, String]): Source = {
-val host = parameters("host")
-val port = parameters("port").toInt
-new TextSocketSource(host, port, parseIncludeTimestamp(parameters), 
sqlContext)
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceOptions): MicroBatchReader = {
+checkParameters(options.asMap().asScala.toMap)
--- End diff --

why not check it as DataSourceOptions (which is known to be 
case-insensitive) rather than a map which raises questions about case 
sensitivity?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167126475
  
--- Diff: 
sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 ---
@@ -5,6 +5,6 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
-org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
 org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
--- End diff --

can you add a redirection in the `DataSource.backwardCompatibilityMap` for 
this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167127039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 readThread.start()
   }
 
-  /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
+  override def setOffsetRange(
+  start: Optional[Offset],
+  end: Optional[Offset]): Unit = synchronized {
+startOffset = start.orElse(LongOffset(-1L))
+endOffset = end.orElse(currentOffset)
+  }
 
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
+  override def getStartOffset(): Offset = {
+Option(startOffset).getOrElse(throw new IllegalStateException("start 
offset not set"))
+  }
+
+  override def getEndOffset(): Offset = {
+Option(endOffset).getOrElse(throw new IllegalStateException("end 
offset not set"))
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+LongOffset(json.toLong)
+  }
+
+  override def readSchema(): StructType = {
+val includeTimestamp = options.getBoolean("includeTimestamp", false)
--- End diff --

supernit: is there need for a variable here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167128652
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -177,11 +177,14 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   Optional.ofNullable(userSpecifiedSchema.orNull),
   Utils.createTempDir(namePrefix = 
s"temporaryReader").getCanonicalPath,
   options)
+val schema = tempReader.readSchema()
+// Stop tempReader to avoid side-affect thing
--- End diff --

nit: side-affect -> side-effect.

good catch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167128044
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 }
   }
 
-  override def toString: String = s"TextSocketSource[host: $host, port: 
$port]"
+  override def toString: String = s"TextSocketMicroBatchReader[host: 
$host, port: $port]"
--- End diff --

This shows up in the StreamingQueryProgressEvent as description, so it may 
be better to have it as "TextSocket[..."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167128861
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -0,0 +1,246 @@
+/*
--- End diff --

why does this show up as a new file? was this not a "git mv"? something 
went wrong, i would prefer that i can see a simple diff. Not much should change 
in the tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167128691
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -177,11 +177,14 @@ final class DataStreamReader 
private[sql](sparkSession: SparkSession) extends Lo
   Optional.ofNullable(userSpecifiedSchema.orNull),
   Utils.createTempDir(namePrefix = 
s"temporaryReader").getCanonicalPath,
   options)
+val schema = tempReader.readSchema()
+// Stop tempReader to avoid side-affect thing
+tempReader.stop()
--- End diff --

i feel like this needs a try finally approach as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167127081
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, 
includeTimestamp: Boolean, sqlCo
 readThread.start()
   }
 
-  /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
+  override def setOffsetRange(
+  start: Optional[Offset],
--- End diff --

nit: wont this fit on a single line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-02-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r167126686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
 ---
@@ -15,40 +15,48 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.streaming
+package org.apache.spark.sql.execution.streaming.sources
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
 import java.net.Socket
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.{Calendar, List => JList, Locale, Optional}
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.execution.streaming.LongOffset
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
-import org.apache.spark.unsafe.types.UTF8String
 
-
-object TextSocketSource {
+object TextSocketMicroBatchReader {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
 StructField("timestamp", TimestampType) :: Nil)
   val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
 }
 
 /**
- * A source that reads text lines through a TCP socket, designed only for 
tutorials and debugging.
- * This source will *not* work in production applications due to multiple 
reasons, including no
- * support for fault recovery and keeping all of the text read in memory 
forever.
+ * A MicroBatchReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This MicroBatchReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery and keeping all of the 
text read in memory
+ * forever.
--- End diff --

this does not keep it forever. so remove this reason, just keep "no support 
for fault recover".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944650
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944362
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TextSocketReader.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.util.Calendar
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.internal.Logging
+
+trait TextSocketReader extends Logging {
--- End diff --

Please add docs!! This is a base interface used by two source 
implementations 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944324
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
--- End diff --

Please update docs accordingly!! This is not a source, but a base interface 
used by two source implementations 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164944276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offset

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164943885
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offs

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164937540
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164934753
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
 println("---")
 // scalastyle:off println
 spark
-  .createDataFrame(spark.sparkContext.parallelize(rows), schema)
+  .createDataFrame(rows.toList.asJava, schema)
--- End diff --

OK, I will create a separate PR for this small fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164933597
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offset

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164930581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,130 +48,141 @@ object TextSocketSource {
  * This source will *not* work in production applications due to multiple 
reasons, including no
  * support for fault recovery and keeping all of the text read in memory 
forever.
  */
-class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
-  extends Source with Logging {
-
-  @GuardedBy("this")
-  private var socket: Socket = null
-
-  @GuardedBy("this")
-  private var readThread: Thread = null
-
-  /**
-   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
-   * Stored in a ListBuffer to facilitate removing committed batches.
-   */
-  @GuardedBy("this")
-  protected val batches = new ListBuffer[(String, Timestamp)]
-
-  @GuardedBy("this")
-  protected var currentOffset: LongOffset = new LongOffset(-1)
-
-  @GuardedBy("this")
-  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+class TextSocketSource(
+protected val host: String,
+protected val port: Int,
+includeTimestamp: Boolean,
+sqlContext: SQLContext)
+  extends Source with TextSocketReader with Logging {
 
   initialize()
 
-  private def initialize(): Unit = synchronized {
-socket = new Socket(host, port)
-val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
-readThread = new Thread(s"TextSocketSource($host, $port)") {
-  setDaemon(true)
-
-  override def run(): Unit = {
-try {
-  while (true) {
-val line = reader.readLine()
-if (line == null) {
-  // End of file reached
-  logWarning(s"Stream closed by $host:$port")
-  return
-}
-TextSocketSource.this.synchronized {
-  val newData = (line,
-Timestamp.valueOf(
-  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-)
-  currentOffset = currentOffset + 1
-  batches.append(newData)
-}
-  }
-} catch {
-  case e: IOException =>
-}
-  }
-}
-readThread.start()
-  }
-
   /** Returns the schema of the data from this source */
-  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
-  else TextSocketSource.SCHEMA_REGULAR
-
-  override def getOffset: Option[Offset] = synchronized {
-if (currentOffset.offset == -1) {
-  None
-} else {
-  Some(currentOffset)
-}
-  }
+  override def schema: StructType =
+if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
+
+  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
-  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
-val startOrdinal =
-  
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
-val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
-
-// Internal buffer only holds the batches after lastOffsetCommitted
-val rawList = synchronized {
-  val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
-  val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
-  batches.slice(sliceStart, sliceEnd)
-}
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
+  LongOffset.convert(end).map(_.offset))
 
 val rdd = sqlContext.sparkContext
   .parallelize(rawList)
   .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
 sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 
-  override def commit(end: Offset): Unit = synchronized {
+  override def commit(end: Offset): Unit = {
 val newOffset = LongOffset.convert(end).getOrElse(
   sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
 s"originate with an instance of this class")
 )
 
-val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+commitInternal(newOffset.offset)
+  }
 
-if (offsetDiff 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r164930523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
 println("---")
 // scalastyle:off println
 spark
-  .createDataFrame(spark.sparkContext.parallelize(rows), schema)
+  .createDataFrame(rows.toList.asJava, schema)
--- End diff --

this fix should go into 2.3 branch. thanks for catching this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163755819
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
--- End diff --

I see, thanks for the clarify. Let me change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163755636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
--- End diff --

The idea is that the existing TextSocketSourceProvider will have the 
MicroBatchReadSupport implementation here, in addition to the 
StreamSourceProvider implementation it already has.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163753088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
--- End diff --

@jose-torres , you mean that instead of creating a new V2 socket source, 
modifying current V1 socket source to make it work with V2, am I understanding 
correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163730153
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
+  override def shortName(): String = "socketv2"
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceV2Options): MicroBatchReader = {
+logWarning("The socket source should not be used for production 
applications! " +
+  "It does not support recovery.")
+if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
+  throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
+}
+if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
+  throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
+}
+if (schema.isPresent) {
+  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+}
+
+if 
(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
+  
Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) 
match {
+case Success(bool) =>
+case Failure(_) =>
+  throw new AnalysisException(
+"includeTimestamp must be set to either \"true\" or \"false\"")
+  }
+}
+
+new TextSocketStreamMicroBatchReader(options)
+  }
+}
+
+case class TextSocketStreamOffset(offset: Long) extends Offset {
+  override def json(): String = offset.toString
+}
+
+class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
+  extends MicroBatchReader with Logging {
+
+  import TextSocketSourceProviderV2._
+
+  private var start: TextSocketStreamOffset = _
+  private var end: TextSocketStreamOffset = _
+
+  private val host = options.get(HOST).get()
+  private val port = options.get(PORT).get().toInt
+  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, 
false)
+  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
+
 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163725096
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
+  override def shortName(): String = "socketv2"
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceV2Options): MicroBatchReader = {
+logWarning("The socket source should not be used for production 
applications! " +
+  "It does not support recovery.")
+if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
+  throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
+}
+if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
+  throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
+}
+if (schema.isPresent) {
+  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+}
+
+if 
(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
+  
Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) 
match {
+case Success(bool) =>
+case Failure(_) =>
+  throw new AnalysisException(
+"includeTimestamp must be set to either \"true\" or \"false\"")
+  }
+}
+
+new TextSocketStreamMicroBatchReader(options)
+  }
+}
+
+case class TextSocketStreamOffset(offset: Long) extends Offset {
+  override def json(): String = offset.toString
+}
+
+class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
+  extends MicroBatchReader with Logging {
+
+  import TextSocketSourceProviderV2._
+
+  private var start: TextSocketStreamOffset = _
+  private var end: TextSocketStreamOffset = _
+
+  private val host = options.get(HOST).get()
+  private val port = options.get(PORT).get().toInt
+  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, 
false)
+  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
+
   

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163614088
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
+  override def shortName(): String = "socketv2"
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceV2Options): MicroBatchReader = {
+logWarning("The socket source should not be used for production 
applications! " +
+  "It does not support recovery.")
+if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
+  throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
+}
+if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
+  throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
+}
+if (schema.isPresent) {
+  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+}
+
+if 
(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
+  
Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) 
match {
+case Success(bool) =>
+case Failure(_) =>
+  throw new AnalysisException(
+"includeTimestamp must be set to either \"true\" or \"false\"")
+  }
+}
+
+new TextSocketStreamMicroBatchReader(options)
+  }
+}
+
+case class TextSocketStreamOffset(offset: Long) extends Offset {
+  override def json(): String = offset.toString
+}
+
+class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
+  extends MicroBatchReader with Logging {
+
+  import TextSocketSourceProviderV2._
+
+  private var start: TextSocketStreamOffset = _
+  private var end: TextSocketStreamOffset = _
+
+  private val host = options.get(HOST).get()
+  private val port = options.get(PORT).get().toInt
+  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, 
false)
+  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
+
 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163613655
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
+  override def shortName(): String = "socketv2"
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceV2Options): MicroBatchReader = {
+logWarning("The socket source should not be used for production 
applications! " +
+  "It does not support recovery.")
+if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
+  throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
+}
+if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
+  throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
+}
+if (schema.isPresent) {
+  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+}
+
+if 
(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
+  
Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) 
match {
+case Success(bool) =>
+case Failure(_) =>
+  throw new AnalysisException(
+"includeTimestamp must be set to either \"true\" or \"false\"")
+  }
+}
+
+new TextSocketStreamMicroBatchReader(options)
+  }
+}
+
+case class TextSocketStreamOffset(offset: Long) extends Offset {
+  override def json(): String = offset.toString
+}
+
+class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
+  extends MicroBatchReader with Logging {
+
+  import TextSocketSourceProviderV2._
+
+  private var start: TextSocketStreamOffset = _
+  private var end: TextSocketStreamOffset = _
+
+  private val host = options.get(HOST).get()
+  private val port = options.get(PORT).get().toInt
+  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, 
false)
+  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
---

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163616762
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
+  override def shortName(): String = "socketv2"
+
+  override def createMicroBatchReader(
+  schema: Optional[StructType],
+  checkpointLocation: String,
+  options: DataSourceV2Options): MicroBatchReader = {
+logWarning("The socket source should not be used for production 
applications! " +
+  "It does not support recovery.")
+if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) {
+  throw new AnalysisException("Set a host to read from with 
option(\"host\", ...).")
+}
+if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) {
+  throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
+}
+if (schema.isPresent) {
+  throw new AnalysisException("The socket source does not support a 
user-specified schema.")
+}
+
+if 
(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) {
+  
Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) 
match {
+case Success(bool) =>
+case Failure(_) =>
+  throw new AnalysisException(
+"includeTimestamp must be set to either \"true\" or \"false\"")
+  }
+}
+
+new TextSocketStreamMicroBatchReader(options)
+  }
+}
+
+case class TextSocketStreamOffset(offset: Long) extends Offset {
+  override def json(): String = offset.toString
+}
+
+class TextSocketStreamMicroBatchReader(options: DataSourceV2Options)
+  extends MicroBatchReader with Logging {
+
+  import TextSocketSourceProviderV2._
+
+  private var start: TextSocketStreamOffset = _
+  private var end: TextSocketStreamOffset = _
+
+  private val host = options.get(HOST).get()
+  private val port = options.get(PORT).get().toInt
+  private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, 
false)
+  private val numPartitions = options.getInt(NUM_PARTITIONS, 1)
+
 

[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163612181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util._
+import java.util.{List => JList}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask}
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+
+
+object TextSocketSourceProviderV2 {
+  val HOST = "host"
+  val PORT = "port"
+  val INCLUDE_TIMESTAMP = "includeTimestamp"
+  val NUM_PARTITIONS = "numPartitions"
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
+StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+class TextSocketSourceProviderV2 extends DataSourceV2
+with MicroBatchReadSupport with DataSourceRegister with Logging {
--- End diff --

The intent is for the V2 and V1 source to live in the same register, so 
existing queries can start using the V2 source with no change needed. This also 
allows the V2 implementation to be validated by passing all the old tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20382#discussion_r163487603
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 ---
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging {
 println("---")
 // scalastyle:off println
 spark
-  .createDataFrame(spark.sparkContext.parallelize(rows), schema)
+  .createDataFrame(rows.toList.asJava, schema)
--- End diff --

Change here to avoid triggering new distributed job.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...

2018-01-24 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/20382

[SPARK-23097][SQL][SS] Migrate text socket source to V2

## What changes were proposed in this pull request?

This PR moves structured streaming text socket source to V2.

Questions: do we need to remove old "socket" source?

## How was this patch tested?

Unit test and manual verification.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-23097

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20382


commit 629243d94835e8eef379122d0c40d5c0b20ea351
Author: jerryshao 
Date:   2018-01-24T08:22:07Z

Move text socket source to V2

commit 8f3b54824d92123a0e7d468d42db30dba72cded1
Author: jerryshao 
Date:   2018-01-24T09:04:44Z

Rename to V2




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org