Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20382#discussion_r164944650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -47,130 +48,141 @@ object TextSocketSource { * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging { - - @GuardedBy("this") - private var socket: Socket = null - - @GuardedBy("this") - private var readThread: Thread = null - - /** - * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. - * Stored in a ListBuffer to facilitate removing committed batches. - */ - @GuardedBy("this") - protected val batches = new ListBuffer[(String, Timestamp)] - - @GuardedBy("this") - protected var currentOffset: LongOffset = new LongOffset(-1) - - @GuardedBy("this") - protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) +class TextSocketSource( + protected val host: String, + protected val port: Int, + includeTimestamp: Boolean, + sqlContext: SQLContext) + extends Source with TextSocketReader with Logging { initialize() - private def initialize(): Unit = synchronized { - socket = new Socket(host, port) - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream)) - readThread = new Thread(s"TextSocketSource($host, $port)") { - setDaemon(true) - - override def run(): Unit = { - try { - while (true) { - val line = reader.readLine() - if (line == null) { - // End of file reached - logWarning(s"Stream closed by $host:$port") - return - } - TextSocketSource.this.synchronized { - val newData = (line, - Timestamp.valueOf( - TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) - ) - currentOffset = currentOffset + 1 - batches.append(newData) - } - } - } catch { - case e: IOException => - } - } - } - readThread.start() - } - /** Returns the schema of the data from this source */ - override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP - else TextSocketSource.SCHEMA_REGULAR - - override def getOffset: Option[Offset] = synchronized { - if (currentOffset.offset == -1) { - None - } else { - Some(currentOffset) - } - } + override def schema: StructType = + if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + + override def getOffset: Option[Offset] = getOffsetInternal.map(LongOffset(_)) /** Returns the data that is between the offsets (`start`, `end`]. */ - override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { - val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 - val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 - - // Internal buffer only holds the batches after lastOffsetCommitted - val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - batches.slice(sliceStart, sliceEnd) - } + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + val rawList = getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset), + LongOffset.convert(end).map(_.offset)) val rdd = sqlContext.sparkContext .parallelize(rawList) .map { case (v, ts) => InternalRow(UTF8String.fromString(v), ts.getTime) } sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: Offset): Unit = { val newOffset = LongOffset.convert(end).getOrElse( sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + s"originate with an instance of this class") ) - val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + commitInternal(newOffset.offset) + } - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") - } + override def toString: String = s"TextSocketSource[host: $host, port: $port]" +} + +case class TextSocketOffset(offset: Long) extends V2Offset { --- End diff -- I would wait for my PR #20445 to go in where I migrate LongOffset to use OffsetV2
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org