Thanks, Jungtaek for curating this list. It covers a lot of important fixes
and performance improvements in structured streaming.

Hi Devs

What is missing from process perspective from getting these PRs merged?
Apart from this list, is there any other forum where we can request
attention to such important PRs. Is the lack of reviews limited to
Structured streaming or are there other areas of spark which are suffering
from similar neglect? Does the community feel that we need a better
turnaround for PRs to make sure that we don't miss out on important
contributions and encourage newbies like me?

Thanks
Vikram

On Tue, Jul 16, 2019 at 10:41 AM Jungtaek Lim <kabh...@gmail.com> wrote:

> Hi devs,
>
> As we make progress on some minor PRs on structured streaming, I'd like to
> remind about major PRs on SS area to get more chances to be reviewed.
>
> Please note that I only include existing PRs, so something still not
> discussed like queryable state is not included in the curation list. Also,
> I've excluded PRs on continuous processing, as I'm not fully sure about
> current direction and vision on this feature. Minor PRs are mostly excluded
> unless they are proposed for a long ago. Last, I could be biased on
> curating list.
>
> Let's get started!
>
> ----
> A. File Source/Sink
>
> 1. [SPARK-20568][SS] Provide option to clean up completed files in
> streaming query
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-20568
> PR: https://github.com/apache/spark/pull/22952
>
> From the nature of "stream", the input data will grow infinitely and end
> users want to have a clear way to clean up completed files. Unlike batch
> query, structured streaming doesn't require all input files to be presented
> - once they've been committed (say, completed processing), they wouldn't be
> read from such query.
>
> This patch automatically cleans up input files when they're committed,
> with three options: 1) keep it as it is, 2) archive (move) to other
> directory 3) delete.
>
> 2. [SPARK-27188][SS] FileStreamSink: provide a new option to have
> retention on output files
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27188
> PR: https://github.com/apache/spark/pull/24128
>
> File sink writes metadata which records list of output files to ensure
> file source to only read correct files, which helps to achieve end-to-end
> exactly once. But file sink has no idea when output files will not be
> accessed from downstream query, so metadata just grows infinitely and
> output files cannot be removed safely.
>
> This patch opens the chance for end users to provide TTL on output files
> so that metadata will eventually exclude expired output files as well as
> end users could remove the output files safely.
>
>
> B. Kafka Source/Sink
>
> 1. [SPARK-21869][SS] A cached Kafka producer should not be closed if any
> task is using it - adds inuse tracking.
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-21869
> PR: https://github.com/apache/spark/pull/19096
>
> This is a long-lasting bug (around 2 years after filing the JIRA issue):
> if some task uses cached Kafka producer longer than 10 minutes, pool will
> recognize it as "timed-out" and just close it. After closing undefined
> behavior from task side will occur.
>
> This patch adds "in-use" tracking on producer to address this. Please note
> that Kafka producer is thread-safe (whereas Kafka consumer is not) and we
> allow using it concurrently, so we can't adopt commons pool to pool
> producer. (Though we can still leverage commons pool if we are OK to not
> share between threads.)
>
> 2. [SPARK-23539][SS] Add support for Kafka headers in Structured Streaming
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-23539
> PR: https://github.com/apache/spark/pull/22282
>
> As there's great doc to rationalize the needs on supporting Kafka headers,
> I'll just let the doc explaining it.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
>
> Please note that the issue has been commented from end users regarding
> availability, which also represents the needs on end users' side.
>
> 3. [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-25151
> PR: https://github.com/apache/spark/pull/22138
>
> Kafka source has its pooling logic for consumers, but as I saw some JIRA
> issues regarding pooling we seem to agree we would like to replace with
> known pool implementation which provides advanced configuration, detailed
> metrics, etc.
>
> This patch adopts Apache Commons Pool (which above advantages are brought)
> to be used as a connection pool for consumers, with respecting to current
> behavior whenever possible. It also separates pooling for consumer and
> fetched data which enables to maximize efficiency on pooling consumers, and
> also address the bug on unnecessary re-fetch on self-join. (The result of
> experiment is in PR's content.)
>
> 4. [SPARK-26848][SQL] Introduce new option to Kafka source: offset by
> timestamp (starting/ending) SQL
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26848
> PR: https://github.com/apache/spark/pull/23747
>
> When end users would want to replay their records in Kafka topic, they
> wouldn't memorize exact offsets per each partition but Spark requires to do
> that, otherwise just start from earliest. We as human being are much
> familiar with time, once we want to replay some records we know the
> timestamp of records we should start from.
>
> This patch opens the chance for end users to provide offset by timestamp
> (either starting or ending, or both) which will be transparently passed on
> Kafka when requesting.
>
>
> C. State
>
> 1. [SPARK-27237][SS] Introduce State schema validation among query restart
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27237
> PR: https://github.com/apache/spark/pull/24173
>
> Spark doesn't have explicit mechanism to avoid end users to change their
> query as "non-compatible". We documented the rules where the query will not
> be compatible between changes, but it's not easier to self-determine the
> rules, and non-friendly error message will be thrown if end users violate
> the rule. In fact, undefined behavior will occur.
>
> This patch introduces state schema validation, which verifies schema
> compatibility regarding states between changes of query, and provides
> informative error message on end users so that they indicate previous
> schema and current schema of state.
>
> This is also a baseline of new data source - state, as we can leverage
> state schema information and not requiring end users to input the schema.
>
> 2. [SPARK-28191][SS] New data source - state - reader part
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28191
> PR: https://github.com/apache/spark/pull/24990
>
> Please read below JIRA issue to see rationalization of state data source,
> as the issue description contains the cases where state data source can be
> used. (e.g. schema evolution on state, offline rescale on state, etc.)
> https://issues.apache.org/jira/browse/SPARK-28190
>
> This patch deals with source part - enables reading states on structured
> streaming query to the batch query.
>
> 3. [SPARK-28120][SS] Rocksdb state storage implementation
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28120
> PR: https://github.com/apache/spark/pull/24922
>
> The memory has been huge limitation of state size. As structured streaming
> loads two versions of state in executor by default, memory pressure becomes
> the real problem on dealing with large state. Scaling up executors may
> work, but it requires unnecessary waste of resource, and it can't help when
> executor is beyond number of partitions. (State data source will eventually
> help on repartitioning but it requires offline batch query.)
>
> State store which resides outside of memory is mandatory to structured
> streaming for dealing with large state, and this patch is trying to address
> it by introducing RocksDB state store provider.
>
>
> D. Structured Streaming
>
> 1. [SPARK-24634][SS] Add a new metric regarding number of rows later than
> watermark
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-24634
> PR: https://github.com/apache/spark/pull/24936
>
> Spark does't provide any information on late rows which could be dropped
> on stateful processor.
>
> This patch adds metrics on counting late rows so that end users can be
> noticed about it. Please note that the issue was originally meant to
> provide the number of dropped rows due the late, but Spark does
> pre-aggregation on streaming aggregation, so it doesn't provide correct
> number. Current approach is less informative than origin intention but
> still bring the value, for example, determining whether the query is
> affected by SPARK-28074.
>
> 2. [SPARK-26154][SS] Streaming left/right outer join should not return
> outer nulls for already matched rows
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26154
> PR: https://github.com/apache/spark/pull/23634
>
> This is long-standing correctness issue, and multiple end users (including
> me) reported about the behavior. This is occurred on edge-case, but the
> edge-case is not hard to reproduce, even closer to example query we provide
> as streaming outer join.
>
> This patch addresses the correctness issue via changing the state on join
> - introduced "matched" flag.
>
> 3. [SPARK-26655][SS] Support multiple aggregates in append mode
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-26655
> PR: https://github.com/apache/spark/pull/23576
>
> Multiple streaming aggregates has been concerned by end users - in
> perspective of end users, it sounds like just an essential thing to
> support, but Spark doesn't support this. There're many SO questions as well
> as mail threads asking this feature, but we still didn't deal with it.
>
> If we only think about append mode, technically the feature is bound to
> proper definition of watermark. We haven't considered watermark calculation
> (and/or propagation) for multiple stages of stateful operations, but as
> there's widely used concept on multiple stages of watermark, we can
> leverage it and focus how to apply it to Spark.
> (For update mode, retraction is needed which would require huge efforts on
> adopting, so let's ignore for now.)
>
> Please keep in mind, lack of definition of watermark on multiple stateful
> stages is not only the problem of multiple streaming aggregations, but also
> multiple stateful operations (including streaming join,
> flatMapGroupsWithState, deduplicate, etc) which is not technically
> restricted by Spark. SPARK-28074 points out this problem.
>
> This patch tries to address multiple aggregates - the patch itself may not
> be valid, but there's a design doc we can move forward and update the
> implementation.
>
> 4. [SPARK-27330][SS] support task abort in foreach writer
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-27330
> PR: https://github.com/apache/spark/pull/24382
>
> Foreach writer could leak resource when task is aborted, as Spark does't
> call writer.close() when task is aborted. If the task throws exception in
> process in foreach writer or succeeds to commit, it would properly call
> close(), but in other case calling close() is missing due to missing proper
> handle about abort.
>
> This patch fixes the bug.
>
> 5. [SPARK-28074][DOC][SS] Document caveats on using multiple stateful
> operations in single query
>
> ISSUE: https://issues.apache.org/jira/browse/SPARK-28074
> PR: https://github.com/apache/spark/pull/24890
>
> As I mentioned in SPARK-26655, Spark doesn't restrict using multiple
> stateful operations in single query (except streaming aggregations), where
> the concept of watermark is not covered properly on multiple stateful
> stages.
>
> I've explained this issue with example on dev mailing list earlier, so you
> can refer the link to see rationalization of issue.
>
> https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E
>
> We've not decided how to let end users avoid the issue (dealing with
> SPARK-26655 is the best but in the meanwhile...) and this patch is trying
> to establish (or discuss) how to guide end users.
>
> SPARK-24634 would be help to end users to determine whether their query is
> affected by this issue, as in append mode intermediate output should not be
> later than watermark.
> ----
>
> Please chime in and share your curation if I'm missing something.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>

Reply via email to