[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20382 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171954250 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThread.po
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171511528 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThre
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171510614 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThread.po
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171506698 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThre
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171469866 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThre
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171226477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -61,13 +68,13 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] + private val batches = new ListBuffer[(String, Timestamp)] @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) + private[sources] var currentOffset: LongOffset = LongOffset(-1L) --- End diff -- this does not make sene. you are directly accessing something that should be accessed while synchronized on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171225853 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThread.po
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r171225732 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.ServerSocketChannel +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + case class AddSocketData(data: String*) extends AddData { +override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + require( +query.nonEmpty, +"Cannot add data when there is no query for finding the active socket source") + + val sources = query.get.logicalPlan.collect { +case StreamingExecutionRelation(source: TextSocketMicroBatchReader, _) => source + } + if (sources.isEmpty) { +throw new Exception( + "Could not find socket source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { +throw new Exception( + "Could not select the socket source in the StreamExecution logical plan as there" + +"are multiple socket sources:\n\t" + sources.mkString("\n\t")) + } + val socketSource = sources.head + + assert(serverThread != null && serverThread.port != 0) + val currOffset = socketSource.currentOffset + data.foreach(serverThread.enqueue) + + val newOffset = LongOffset(currOffset.offset + data.size) + (socketSource, newOffset) +} + +override def toString: String = s"AddSocketData(data = $data)" + } + + test("backward compatibility with old path") { + DataSource.lookupDataSource("org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", + spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => +assert(ds.isInstanceOf[TextSocketSourceProvider]) + case _ => +throw new IllegalStateException("Could not find socket source") +} + } + + test("basic usage") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + val ref = spark + import ref.implicits._ + + val socket = spark +.readStream +.format("socket") +.options(Map("host" -> "localhost", "port" -> serverThread.po
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r170759995 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{IOException, OutputStreamWriter} +import java.net.ServerSocket +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + test("V2 basic usage") { +serverThread = new ServerThread() +serverThread.start() + +val provider = new TextSocketSourceProvider +val options = new DataSourceOptions( + Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava) +batchReader = provider.createMicroBatchReader(Optional.empty(), "", options) + +val schema = batchReader.readSchema() +assert(schema === StructType(StructField("value", StringType) :: Nil)) + +failAfter(streamingTimeout) { + serverThread.enqueue("hello") + batchReader.setOffsetRange(Optional.empty(), Optional.empty()) + while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) { +batchReader.setOffsetRange(Optional.empty(), Optional.empty()) +Thread.sleep(10) + } + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { +val offset1 = batchReader.getEndOffset +val batch1 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch1.append(r.get()) + } +} +assert(batch1.map(_.getAs[String](0)) === Seq("hello")) + +serverThread.enqueue("world") +while (batchReader.getEndOffset === offset1) { + batchReader.setOffsetRange(Optional.of(offset1), Optional.empty()) + Thread.sleep(10) +} +val offset2 = batchReader.getEndOffset +val batch2 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch2.append(r.get()) + } +} +assert(batch2.map(_.getAs[String](0)) === Seq("world")) + +batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2)) +val both = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +both.append(r.get()) + } +} +assert(
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r170178735 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{IOException, OutputStreamWriter} +import java.net.ServerSocket +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + test("V2 basic usage") { +serverThread = new ServerThread() +serverThread.start() + +val provider = new TextSocketSourceProvider +val options = new DataSourceOptions( + Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava) +batchReader = provider.createMicroBatchReader(Optional.empty(), "", options) + +val schema = batchReader.readSchema() +assert(schema === StructType(StructField("value", StringType) :: Nil)) + +failAfter(streamingTimeout) { + serverThread.enqueue("hello") + batchReader.setOffsetRange(Optional.empty(), Optional.empty()) + while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) { +batchReader.setOffsetRange(Optional.empty(), Optional.empty()) +Thread.sleep(10) + } + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { +val offset1 = batchReader.getEndOffset +val batch1 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch1.append(r.get()) + } +} +assert(batch1.map(_.getAs[String](0)) === Seq("hello")) + +serverThread.enqueue("world") +while (batchReader.getEndOffset === offset1) { + batchReader.setOffsetRange(Optional.of(offset1), Optional.empty()) + Thread.sleep(10) +} +val offset2 = batchReader.getEndOffset +val batch2 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch2.append(r.get()) + } +} +assert(batch2.map(_.getAs[String](0)) === Seq("world")) + +batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2)) +val both = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +both.append(r.get()) + } +} +as
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r168363424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -583,7 +585,8 @@ object DataSource extends Logging { "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, - "com.databricks.spark.csv" -> csv + "com.databricks.spark.csv" -> csv, + "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket --- End diff -- please add a test for this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r168138470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{IOException, OutputStreamWriter} +import java.net.ServerSocket +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + test("V2 basic usage") { --- End diff -- These updated tests are getting more complicated with the direct calling of low-level data source APIs. Can you convert these tests to the more highlevel tests like Kafka? Well if it gets too complicated to make it work with `testStream` then you can simply use `query.processAllAvailable`. Then we wont have to worry about changing APIs any more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r168137288 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{IOException, OutputStreamWriter} +import java.net.ServerSocket +import java.sql.Timestamp +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + +class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach() { +sqlContext.streams.active.foreach(_.stop()) +if (serverThread != null) { + serverThread.interrupt() + serverThread.join() + serverThread = null +} +if (batchReader != null) { + batchReader.stop() + batchReader = null +} + } + + private var serverThread: ServerThread = null + private var batchReader: MicroBatchReader = null + + test("V2 basic usage") { +serverThread = new ServerThread() +serverThread.start() + +val provider = new TextSocketSourceProvider +val options = new DataSourceOptions( + Map("host" -> "localhost", "port" -> serverThread.port.toString).asJava) +batchReader = provider.createMicroBatchReader(Optional.empty(), "", options) + +val schema = batchReader.readSchema() +assert(schema === StructType(StructField("value", StringType) :: Nil)) + +failAfter(streamingTimeout) { + serverThread.enqueue("hello") + batchReader.setOffsetRange(Optional.empty(), Optional.empty()) + while (batchReader.getEndOffset.asInstanceOf[LongOffset].offset == -1L) { +batchReader.setOffsetRange(Optional.empty(), Optional.empty()) +Thread.sleep(10) + } + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { +val offset1 = batchReader.getEndOffset +val batch1 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch1.append(r.get()) + } +} +assert(batch1.map(_.getAs[String](0)) === Seq("hello")) + +serverThread.enqueue("world") +while (batchReader.getEndOffset === offset1) { + batchReader.setOffsetRange(Optional.of(offset1), Optional.empty()) + Thread.sleep(10) +} +val offset2 = batchReader.getEndOffset +val batch2 = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +batch2.append(r.get()) + } +} +assert(batch2.map(_.getAs[String](0)) === Seq("world")) + +batchReader.setOffsetRange(Optional.empty(), Optional.of(offset2)) +val both = new ListBuffer[Row] + batchReader.createDataReaderFactories().asScala.map(_.createDataReader()).foreach { r => + while (r.next()) { +both.append(r.get()) + } +} +assert(
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167776323 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* --- End diff -- Sorry @tdas , I did it by simply "mv", not "git mv". This doesn't change a lot, just to be suited for data source v2 API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167713045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -15,40 +15,48 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.Socket import java.sql.Timestamp import java.text.SimpleDateFormat -import java.util.{Calendar, Locale} +import java.util.{Calendar, List => JList, Locale, Optional} import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} -import org.apache.spark.unsafe.types.UTF8String - -object TextSocketSource { +object TextSocketMicroBatchReader { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: StructField("timestamp", TimestampType) :: Nil) val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) } /** - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging. - * This source will *not* work in production applications due to multiple reasons, including no - * support for fault recovery and keeping all of the text read in memory forever. + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and --- End diff -- Tutorials is correct here; see e.g. StructuredSessionization.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167126569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -15,40 +15,48 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.Socket import java.sql.Timestamp import java.text.SimpleDateFormat -import java.util.{Calendar, Locale} +import java.util.{Calendar, List => JList, Locale, Optional} import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} -import org.apache.spark.unsafe.types.UTF8String - -object TextSocketSource { +object TextSocketMicroBatchReader { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: StructField("timestamp", TimestampType) :: Nil) val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) } /** - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging. - * This source will *not* work in production applications due to multiple reasons, including no - * support for fault recovery and keeping all of the text read in memory forever. + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and --- End diff -- nit: tutorials -> testing (i know it was like that, but lets fix it since we are changing it anyway) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167128485 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo } } - override def toString: String = s"TextSocketSource[host: $host, port: $port]" + override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]" } -class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { - private def parseIncludeTimestamp(params: Map[String, String]): Boolean = { -Try(params.getOrElse("includeTimestamp", "false").toBoolean) match { - case Success(bool) => bool - case Failure(_) => -throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"") -} - } +class TextSocketSourceProvider extends DataSourceV2 + with MicroBatchReadSupport with DataSourceRegister with Logging { - /** Returns the name and schema of the source that can be used to continually read data. */ - override def sourceSchema( - sqlContext: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): (String, StructType) = { + private def checkParameters(params: Map[String, String]): Unit = { logWarning("The socket source should not be used for production applications! " + "It does not support recovery.") -if (!parameters.contains("host")) { +if (!params.contains("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } -if (!parameters.contains("port")) { +if (!params.contains("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } -if (schema.nonEmpty) { - throw new AnalysisException("The socket source does not support a user-specified schema.") +Try { + params.get("includeTimestamp") +.orElse(params.get("includetimestamp")) +.getOrElse("false") +.toBoolean +} match { + case Success(_) => + case Failure(_) => +throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"") } - -val sourceSchema = - if (parseIncludeTimestamp(parameters)) { -TextSocketSource.SCHEMA_TIMESTAMP - } else { -TextSocketSource.SCHEMA_REGULAR - } -("textSocket", sourceSchema) } - override def createSource( - sqlContext: SQLContext, - metadataPath: String, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): Source = { -val host = parameters("host") -val port = parameters("port").toInt -new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext) + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceOptions): MicroBatchReader = { +checkParameters(options.asMap().asScala.toMap) --- End diff -- why not check it as DataSourceOptions (which is known to be case-insensitive) rather than a map which raises questions about case sensitivity? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167126475 --- Diff: sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister --- @@ -5,6 +5,6 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider -org.apache.spark.sql.execution.streaming.TextSocketSourceProvider org.apache.spark.sql.execution.streaming.RateSourceProvider +org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider --- End diff -- can you add a redirection in the `DataSource.backwardCompatibilityMap` for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167127039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo readThread.start() } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR + override def setOffsetRange( + start: Optional[Offset], + end: Optional[Offset]): Unit = synchronized { +startOffset = start.orElse(LongOffset(-1L)) +endOffset = end.orElse(currentOffset) + } - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None + override def getStartOffset(): Offset = { +Option(startOffset).getOrElse(throw new IllegalStateException("start offset not set")) + } + + override def getEndOffset(): Offset = { +Option(endOffset).getOrElse(throw new IllegalStateException("end offset not set")) + } + + override def deserializeOffset(json: String): Offset = { +LongOffset(json.toLong) + } + + override def readSchema(): StructType = { +val includeTimestamp = options.getBoolean("includeTimestamp", false) --- End diff -- supernit: is there need for a variable here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167128652 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, options) +val schema = tempReader.readSchema() +// Stop tempReader to avoid side-affect thing --- End diff -- nit: side-affect -> side-effect. good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167128044 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo } } - override def toString: String = s"TextSocketSource[host: $host, port: $port]" + override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]" --- End diff -- This shows up in the StreamingQueryProgressEvent as description, so it may be better to have it as "TextSocket[..." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167128861 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -0,0 +1,246 @@ +/* --- End diff -- why does this show up as a new file? was this not a "git mv"? something went wrong, i would prefer that i can see a simple diff. Not much should change in the tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167128691 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo Optional.ofNullable(userSpecifiedSchema.orNull), Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, options) +val schema = tempReader.readSchema() +// Stop tempReader to avoid side-affect thing +tempReader.stop() --- End diff -- i feel like this needs a try finally approach as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167127081 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -103,23 +111,40 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo readThread.start() } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR + override def setOffsetRange( + start: Optional[Offset], --- End diff -- nit: wont this fit on a single line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r167126686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala --- @@ -15,40 +15,48 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming +package org.apache.spark.sql.execution.streaming.sources import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.Socket import java.sql.Timestamp import java.text.SimpleDateFormat -import java.util.{Calendar, Locale} +import java.util.{Calendar, List => JList, Locale, Optional} import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.execution.streaming.LongOffset +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, MicroBatchReadSupport} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} -import org.apache.spark.unsafe.types.UTF8String - -object TextSocketSource { +object TextSocketMicroBatchReader { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: StructField("timestamp", TimestampType) :: Nil) val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) } /** - * A source that reads text lines through a TCP socket, designed only for tutorials and debugging. - * This source will *not* work in production applications due to multiple reasons, including no - * support for fault recovery and keeping all of the text read in memory forever. + * A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This MicroBatchReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery and keeping all of the text read in memory + * forever. --- End diff -- this does not keep it forever. so remove this reason, just keep "no support for fault recover". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164944650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offsetDiff
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164944362 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TextSocketReader.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.util.Calendar +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.internal.Logging + +trait TextSocketReader extends Logging { --- End diff -- Please add docs!! This is a base interface used by two source implementations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164944324 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( --- End diff -- Please update docs accordingly!! This is not a source, but a base interface used by two source implementations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164944276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offset
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164943885 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offs
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164937540 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offsetDiff
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164934753 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging { println("---") // scalastyle:off println spark - .createDataFrame(spark.sparkContext.parallelize(rows), schema) + .createDataFrame(rows.toList.asJava, schema) --- End diff -- OK, I will create a separate PR for this small fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164933597 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offset
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164930581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( +protected val host: String, +protected val port: Int, +includeTimestamp: Boolean, +sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { -socket = new Socket(host, port) -val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) -readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { -try { - while (true) { -val line = reader.readLine() -if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return -} -TextSocketSource.this.synchronized { - val newData = (line, -Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) -) - currentOffset = currentOffset + 1 - batches.append(newData) -} - } -} catch { - case e: IOException => -} - } -} -readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { -if (currentOffset.offset == -1) { - None -} else { - Some(currentOffset) -} - } + override def schema: StructType = +if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { -val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 -val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - -// Internal buffer only holds the batches after lastOffsetCommitted -val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) -} + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { +val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) -val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt +commitInternal(newOffset.offset) + } -if (offsetDiff
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164930523 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging { println("---") // scalastyle:off println spark - .createDataFrame(spark.sparkContext.parallelize(rows), schema) + .createDataFrame(rows.toList.asJava, schema) --- End diff -- this fix should go into 2.3 branch. thanks for catching this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163755819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- I see, thanks for the clarify. Let me change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163755636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- The idea is that the existing TextSocketSourceProvider will have the MicroBatchReadSupport implementation here, in addition to the StreamSourceProvider implementation it already has. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163753088 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- @jose-torres , you mean that instead of creating a new V2 socket source, modifying current V1 socket source to make it work with V2, am I understanding correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163730153 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { + override def shortName(): String = "socketv2" + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { +logWarning("The socket source should not be used for production applications! " + + "It does not support recovery.") +if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) { + throw new AnalysisException("Set a host to read from with option(\"host\", ...).") +} +if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) { + throw new AnalysisException("Set a port to read from with option(\"port\", ...).") +} +if (schema.isPresent) { + throw new AnalysisException("The socket source does not support a user-specified schema.") +} + +if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) { + Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match { +case Success(bool) => +case Failure(_) => + throw new AnalysisException( +"includeTimestamp must be set to either \"true\" or \"false\"") + } +} + +new TextSocketStreamMicroBatchReader(options) + } +} + +case class TextSocketStreamOffset(offset: Long) extends Offset { + override def json(): String = offset.toString +} + +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options) + extends MicroBatchReader with Logging { + + import TextSocketSourceProviderV2._ + + private var start: TextSocketStreamOffset = _ + private var end: TextSocketStreamOffset = _ + + private val host = options.get(HOST).get() + private val port = options.get(PORT).get().toInt + private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) + private val numPartitions = options.getInt(NUM_PARTITIONS, 1) +
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163725096 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { + override def shortName(): String = "socketv2" + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { +logWarning("The socket source should not be used for production applications! " + + "It does not support recovery.") +if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) { + throw new AnalysisException("Set a host to read from with option(\"host\", ...).") +} +if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) { + throw new AnalysisException("Set a port to read from with option(\"port\", ...).") +} +if (schema.isPresent) { + throw new AnalysisException("The socket source does not support a user-specified schema.") +} + +if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) { + Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match { +case Success(bool) => +case Failure(_) => + throw new AnalysisException( +"includeTimestamp must be set to either \"true\" or \"false\"") + } +} + +new TextSocketStreamMicroBatchReader(options) + } +} + +case class TextSocketStreamOffset(offset: Long) extends Offset { + override def json(): String = offset.toString +} + +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options) + extends MicroBatchReader with Logging { + + import TextSocketSourceProviderV2._ + + private var start: TextSocketStreamOffset = _ + private var end: TextSocketStreamOffset = _ + + private val host = options.get(HOST).get() + private val port = options.get(PORT).get().toInt + private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) + private val numPartitions = options.getInt(NUM_PARTITIONS, 1) +
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163614088 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { + override def shortName(): String = "socketv2" + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { +logWarning("The socket source should not be used for production applications! " + + "It does not support recovery.") +if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) { + throw new AnalysisException("Set a host to read from with option(\"host\", ...).") +} +if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) { + throw new AnalysisException("Set a port to read from with option(\"port\", ...).") +} +if (schema.isPresent) { + throw new AnalysisException("The socket source does not support a user-specified schema.") +} + +if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) { + Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match { +case Success(bool) => +case Failure(_) => + throw new AnalysisException( +"includeTimestamp must be set to either \"true\" or \"false\"") + } +} + +new TextSocketStreamMicroBatchReader(options) + } +} + +case class TextSocketStreamOffset(offset: Long) extends Offset { + override def json(): String = offset.toString +} + +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options) + extends MicroBatchReader with Logging { + + import TextSocketSourceProviderV2._ + + private var start: TextSocketStreamOffset = _ + private var end: TextSocketStreamOffset = _ + + private val host = options.get(HOST).get() + private val port = options.get(PORT).get().toInt + private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) + private val numPartitions = options.getInt(NUM_PARTITIONS, 1) +
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163613655 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { + override def shortName(): String = "socketv2" + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { +logWarning("The socket source should not be used for production applications! " + + "It does not support recovery.") +if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) { + throw new AnalysisException("Set a host to read from with option(\"host\", ...).") +} +if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) { + throw new AnalysisException("Set a port to read from with option(\"port\", ...).") +} +if (schema.isPresent) { + throw new AnalysisException("The socket source does not support a user-specified schema.") +} + +if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) { + Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match { +case Success(bool) => +case Failure(_) => + throw new AnalysisException( +"includeTimestamp must be set to either \"true\" or \"false\"") + } +} + +new TextSocketStreamMicroBatchReader(options) + } +} + +case class TextSocketStreamOffset(offset: Long) extends Offset { + override def json(): String = offset.toString +} + +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options) + extends MicroBatchReader with Logging { + + import TextSocketSourceProviderV2._ + + private var start: TextSocketStreamOffset = _ + private var end: TextSocketStreamOffset = _ + + private val host = options.get(HOST).get() + private val port = options.get(PORT).get().toInt + private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) + private val numPartitions = options.getInt(NUM_PARTITIONS, 1) ---
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163616762 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { + override def shortName(): String = "socketv2" + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { +logWarning("The socket source should not be used for production applications! " + + "It does not support recovery.") +if (!options.get(TextSocketSourceProviderV2.HOST).isPresent) { + throw new AnalysisException("Set a host to read from with option(\"host\", ...).") +} +if (!options.get(TextSocketSourceProviderV2.PORT).isPresent) { + throw new AnalysisException("Set a port to read from with option(\"port\", ...).") +} +if (schema.isPresent) { + throw new AnalysisException("The socket source does not support a user-specified schema.") +} + +if (options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).isPresent) { + Try(options.get(TextSocketSourceProviderV2.INCLUDE_TIMESTAMP).get().toBoolean) match { +case Success(bool) => +case Failure(_) => + throw new AnalysisException( +"includeTimestamp must be set to either \"true\" or \"false\"") + } +} + +new TextSocketStreamMicroBatchReader(options) + } +} + +case class TextSocketStreamOffset(offset: Long) extends Offset { + override def json(): String = offset.toString +} + +class TextSocketStreamMicroBatchReader(options: DataSourceV2Options) + extends MicroBatchReader with Logging { + + import TextSocketSourceProviderV2._ + + private var start: TextSocketStreamOffset = _ + private var end: TextSocketStreamOffset = _ + + private val host = options.get(HOST).get() + private val port = options.get(PORT).get().toInt + private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) + private val numPartitions = options.getInt(NUM_PARTITIONS, 1) +
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163612181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSourceV2.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util._ +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object TextSocketSourceProviderV2 { + val HOST = "host" + val PORT = "port" + val INCLUDE_TIMESTAMP = "includeTimestamp" + val NUM_PARTITIONS = "numPartitions" + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: +StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +class TextSocketSourceProviderV2 extends DataSourceV2 +with MicroBatchReadSupport with DataSourceRegister with Logging { --- End diff -- The intent is for the V2 and V1 source to live in the same register, so existing queries can start using the V2 source with no change needed. This also allows the V2 implementation to be validated by passing all the old tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r163487603 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala --- @@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging { println("---") // scalastyle:off println spark - .createDataFrame(spark.sparkContext.parallelize(rows), schema) + .createDataFrame(rows.toList.asJava, schema) --- End diff -- Change here to avoid triggering new distributed job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20382: [SPARK-23097][SQL][SS] Migrate text socket source...
GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/20382 [SPARK-23097][SQL][SS] Migrate text socket source to V2 ## What changes were proposed in this pull request? This PR moves structured streaming text socket source to V2. Questions: do we need to remove old "socket" source? ## How was this patch tested? Unit test and manual verification. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-23097 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20382.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20382 commit 629243d94835e8eef379122d0c40d5c0b20ea351 Author: jerryshao Date: 2018-01-24T08:22:07Z Move text socket source to V2 commit 8f3b54824d92123a0e7d468d42db30dba72cded1 Author: jerryshao Date: 2018-01-24T09:04:44Z Rename to V2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org