[
https://issues.apache.org/jira/browse/SPARK-53784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Jerry Peng updated SPARK-53784:
--------------------------------------
Description:
Currently in Structured Streaming, start and end offsets are determined at the
driver prior to running the micro-batch. In real-time mode, end offsets are
not known apriori. They are communicated to the driver later by the executors
at the end of microbatch that runs for a fixed amount of time. Thus, we need
to add additional APIs in the source to support this kind of behavior.
The lifecycle of the new API is the following.
Driver side:
# prepareForRealTimeMode
** Called during logical planning to inform the source if it's in real time
mode
# planInputPartitions
** The driver plans partitions via planPartitions but only a starting offset
is provided (Compared to existing execution modes that require planPartitions
to provide both a starting and end offset)
# mergeOffsets
** Merge partitioned offsets coming from partitions/tasks to a single global
offset.
Task side:
# nextWithTimeout
** Alternative function to be called than next(), that proceed to the next
record. The different from next() is that, if there is no more records, the
call needs to keep waiting until the timeout
#
getOffset
## Get the offset of the next record, or the start offset if no records have
been read. The execution engine will call this method along with get() to keep
track of the current offset. When a task ends, the offset in each partition
will be passed back to the driver. They will be used as the start offsets of
the next batch.
was:
Currently in Structured Streaming, start and end offsets are determined at the
driver prior to running the micro-batch. In real-time mode, end offsets are
not known apriori. They
are communicated to the driver later by the executors at the end of microbatch
that runs for a fixed amount of time. Thus, we need to add additional APIs in
the source to support this kind of behavior.
The lifecycle of the new API is the following
# prepareForRealTimeMode
** Called during logical planning to inform the source if it's in real time
mode
# planInputPartitions
** The driver plans partitions via planPartitions but only a starting offset
is provided (Compared to existing execution modes that require planPartitions
to provide both a starting and end offset)
# mergeOffsets
** Merge partitioned offsets coming from partitions/tasks to a single global
offset.
> Additional Source APIs needed to support RTM execution
> ------------------------------------------------------
>
> Key: SPARK-53784
> URL: https://issues.apache.org/jira/browse/SPARK-53784
> Project: Spark
> Issue Type: Story
> Components: Structured Streaming
> Affects Versions: 4.1.0
> Reporter: Boyang Jerry Peng
> Priority: Major
>
> Currently in Structured Streaming, start and end offsets are determined at
> the driver prior to running the micro-batch. In real-time mode, end offsets
> are not known apriori. They are communicated to the driver later by the
> executors at the end of microbatch that runs for a fixed amount of time.
> Thus, we need to add additional APIs in the source to support this kind of
> behavior.
> The lifecycle of the new API is the following.
> Driver side:
> # prepareForRealTimeMode
> ** Called during logical planning to inform the source if it's in real time
> mode
> # planInputPartitions
> ** The driver plans partitions via planPartitions but only a starting offset
> is provided (Compared to existing execution modes that require planPartitions
> to provide both a starting and end offset)
> # mergeOffsets
> ** Merge partitioned offsets coming from partitions/tasks to a single global
> offset.
>
> Task side:
> # nextWithTimeout
> ** Alternative function to be called than next(), that proceed to the next
> record. The different from next() is that, if there is no more records, the
> call needs to keep waiting until the timeout
> #
> getOffset
> ## Get the offset of the next record, or the start offset if no records have
> been read. The execution engine will call this method along with get() to
> keep track of the current offset. When a task ends, the offset in each
> partition will be passed back to the driver. They will be used as the start
> offsets of the next batch.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]