Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20382#discussion_r164944650
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
    @@ -47,130 +48,141 @@ object TextSocketSource {
      * This source will *not* work in production applications due to multiple 
reasons, including no
      * support for fault recovery and keeping all of the text read in memory 
forever.
      */
    -class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, 
sqlContext: SQLContext)
    -  extends Source with Logging {
    -
    -  @GuardedBy("this")
    -  private var socket: Socket = null
    -
    -  @GuardedBy("this")
    -  private var readThread: Thread = null
    -
    -  /**
    -   * All batches from `lastCommittedOffset + 1` to `currentOffset`, 
inclusive.
    -   * Stored in a ListBuffer to facilitate removing committed batches.
    -   */
    -  @GuardedBy("this")
    -  protected val batches = new ListBuffer[(String, Timestamp)]
    -
    -  @GuardedBy("this")
    -  protected var currentOffset: LongOffset = new LongOffset(-1)
    -
    -  @GuardedBy("this")
    -  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
    +class TextSocketSource(
    +    protected val host: String,
    +    protected val port: Int,
    +    includeTimestamp: Boolean,
    +    sqlContext: SQLContext)
    +  extends Source with TextSocketReader with Logging {
     
       initialize()
     
    -  private def initialize(): Unit = synchronized {
    -    socket = new Socket(host, port)
    -    val reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream))
    -    readThread = new Thread(s"TextSocketSource($host, $port)") {
    -      setDaemon(true)
    -
    -      override def run(): Unit = {
    -        try {
    -          while (true) {
    -            val line = reader.readLine()
    -            if (line == null) {
    -              // End of file reached
    -              logWarning(s"Stream closed by $host:$port")
    -              return
    -            }
    -            TextSocketSource.this.synchronized {
    -              val newData = (line,
    -                Timestamp.valueOf(
    -                  
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
    -                )
    -              currentOffset = currentOffset + 1
    -              batches.append(newData)
    -            }
    -          }
    -        } catch {
    -          case e: IOException =>
    -        }
    -      }
    -    }
    -    readThread.start()
    -  }
    -
       /** Returns the schema of the data from this source */
    -  override def schema: StructType = if (includeTimestamp) 
TextSocketSource.SCHEMA_TIMESTAMP
    -  else TextSocketSource.SCHEMA_REGULAR
    -
    -  override def getOffset: Option[Offset] = synchronized {
    -    if (currentOffset.offset == -1) {
    -      None
    -    } else {
    -      Some(currentOffset)
    -    }
    -  }
    +  override def schema: StructType =
    +    if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else 
TextSocketSource.SCHEMA_REGULAR
    +
    +  override def getOffset: Option[Offset] = 
getOffsetInternal.map(LongOffset(_))
     
       /** Returns the data that is between the offsets (`start`, `end`]. */
    -  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
    -    val startOrdinal =
    -      
start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
    -    val endOrdinal = 
LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
    -
    -    // Internal buffer only holds the batches after lastOffsetCommitted
    -    val rawList = synchronized {
    -      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
    -      batches.slice(sliceStart, sliceEnd)
    -    }
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    val rawList = 
getBatchInternal(start.flatMap(LongOffset.convert).map(_.offset),
    +      LongOffset.convert(end).map(_.offset))
     
         val rdd = sqlContext.sparkContext
           .parallelize(rawList)
           .map { case (v, ts) => InternalRow(UTF8String.fromString(v), 
ts.getTime) }
         sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
       }
     
    -  override def commit(end: Offset): Unit = synchronized {
    +  override def commit(end: Offset): Unit = {
         val newOffset = LongOffset.convert(end).getOrElse(
           sys.error(s"TextSocketStream.commit() received an offset ($end) that 
did not " +
             s"originate with an instance of this class")
         )
     
    -    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
    +    commitInternal(newOffset.offset)
    +  }
     
    -    if (offsetDiff < 0) {
    -      sys.error(s"Offsets committed out of order: $lastOffsetCommitted 
followed by $end")
    -    }
    +  override def toString: String = s"TextSocketSource[host: $host, port: 
$port]"
    +}
    +
    +case class TextSocketOffset(offset: Long) extends V2Offset {
    --- End diff --
    
    I would wait for my PR #20445 to go in where I migrate LongOffset to use 
OffsetV2


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to