[ 
https://issues.apache.org/jira/browse/SPARK-53784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Jerry Peng updated SPARK-53784:
--------------------------------------
    Description: 
Currently in Structured Streaming, start and end offsets are determined at the 
driver prior to running the micro-batch.  In real-time mode, end offsets are 
not known apriori. They are communicated to the driver later by the executors 
at the end of microbatch that runs for a fixed amount of time.  Thus, we need 
to add additional APIs in the source to support this kind of behavior.

The lifecycle of the new API is the following.

Driver side:
 # prepareForRealTimeMode
 ** Called during logical planning to inform the source if it's in real time 
mode
 # planInputPartitions
 ** The driver plans partitions via planPartitions but only a starting offset 
is provided (Compared to existing execution modes that require planPartitions 
to provide both a starting and end offset)
 # mergeOffsets
 ** Merge partitioned offsets coming from partitions/tasks to a single global 
offset.

 

Task side:
 # nextWithTimeout
 ** Alternative function to be called than next(), that proceed to the next 
record. The different from next() is that, if there is no more records, the 
call needs to keep waiting until the timeout
 # 
getOffset
 ## Get the offset of the next record, or the start offset if no records have 
been read. The execution engine will call this method along with get() to keep 
track of the current offset. When a task ends, the offset in each partition 
will be passed back to the driver. They will be used as the start offsets of 
the next batch.

 

  was:
Currently in Structured Streaming, start and end offsets are determined at the 
driver prior to running the micro-batch.  In real-time mode, end offsets are 
not known apriori. They
are communicated to the driver later by the executors at the end of microbatch 
that runs for a fixed amount of time.  Thus, we need to add additional APIs in 
the source to support this kind of behavior.

The lifecycle of the new API is the following
 # prepareForRealTimeMode
 ** Called during logical planning to inform the source if it's in real time 
mode
 # planInputPartitions
 ** The driver plans partitions via planPartitions but only a starting offset 
is provided (Compared to existing execution modes that require planPartitions 
to provide both a starting and end offset)
 # mergeOffsets
 ** Merge partitioned offsets coming from partitions/tasks to a single global 
offset.


> Additional Source APIs needed to support RTM execution
> ------------------------------------------------------
>
>                 Key: SPARK-53784
>                 URL: https://issues.apache.org/jira/browse/SPARK-53784
>             Project: Spark
>          Issue Type: Story
>          Components: Structured Streaming
>    Affects Versions: 4.1.0
>            Reporter: Boyang Jerry Peng
>            Priority: Major
>
> Currently in Structured Streaming, start and end offsets are determined at 
> the driver prior to running the micro-batch.  In real-time mode, end offsets 
> are not known apriori. They are communicated to the driver later by the 
> executors at the end of microbatch that runs for a fixed amount of time.  
> Thus, we need to add additional APIs in the source to support this kind of 
> behavior.
> The lifecycle of the new API is the following.
> Driver side:
>  # prepareForRealTimeMode
>  ** Called during logical planning to inform the source if it's in real time 
> mode
>  # planInputPartitions
>  ** The driver plans partitions via planPartitions but only a starting offset 
> is provided (Compared to existing execution modes that require planPartitions 
> to provide both a starting and end offset)
>  # mergeOffsets
>  ** Merge partitioned offsets coming from partitions/tasks to a single global 
> offset.
>  
> Task side:
>  # nextWithTimeout
>  ** Alternative function to be called than next(), that proceed to the next 
> record. The different from next() is that, if there is no more records, the 
> call needs to keep waiting until the timeout
>  # 
> getOffset
>  ## Get the offset of the next record, or the start offset if no records have 
> been read. The execution engine will call this method along with get() to 
> keep track of the current offset. When a task ends, the offset in each 
> partition will be passed back to the driver. They will be used as the start 
> offsets of the next batch.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to