[ 
https://issues.apache.org/jira/browse/SPARK-31724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108513#comment-17108513
 ] 

Xingcan Cui edited comment on SPARK-31724 at 5/15/20, 6:02 PM:
---------------------------------------------------------------

Copy some comments from [https://github.com/apache/spark/pull/28523]
----
xccui commented 23 hours ago

 

Hi all, I am not quite familiar with the version history of the streaming sink, 
but would like to share pieces of my thought here. Please correct me if I 
misunderstood.

I think SupportsStreamingUpdate should NOT be a sink specific feature. We 
concentrate on sink now because the current SS implementation doesn't allow 
chaining operators (other than the sink) which produce updating results.
 The only reason we provide update mode should be to produce the "correct" 
result table, i.e., to make the result table identical with the one produced by 
applying the same query on the materialized input rows.
 The semantics of update mode we provided lacks deleting support, which makes 
it unreliable sometimes. I suppose we all agree on offering a better design in 
the future. But for now, it's better to keep it unchanged with previous 
versions (and also with the lowest risk and efforts).
 I'll try to make some improvements to the related issues.
----
HeartSaVioR commented 17 hours ago

 

Thanks for the great input, @xccui.

Basically I agree with your input - that's the same as my understanding as I 
commented before (#28523 (comment)).

To summarize my previous comment, I also don't know how the streaming output 
mode was designed, but from my understanding it's effective only on result 
table for stateful aggregation operators. It's not even applied for all 
stateful operators, e.g. the mode doesn't affect stream-stream join. It doesn't 
guarantee the final output is respecting the semantic, and then there's no 
meaning of applying the same on the sink side.

Another concern comes into my mind is complete mode. The complete mode is also 
effective on the result table. It may sound making sense to support complete 
mode in sink as truncate and insert, but it leads to data loss for the case the 
result table is being union to other stream which is not creating "result 
table". (I haven't had such query but it's technically possible.) The complete 
mode will not care about the other stream and in every batch the previous 
output from the other stream will be lost. I think complete mode is weird one 
for streaming and better to discontinue supporting; I wouldn't expect any 
production query to use this mode, but please let me know if there is.

Anyway I think the streaming update mode technically doesn't couple with the 
availability of sink. It should be left as it is, though we'll probably have to 
fix guide doc as the guide doc says it's for result table "as well as" for the 
sink. Description of the streaming output mode in sink should be corrected as 
well - they're not dependent on streaming output mode, and as of now only 
append is possible.

ps. We may need to revisit the operators and streaming output modes to see any 
flaw, similarly I went through via discussion thread and #24890. One thing 
would be flatMapGroupsWithState with append mode.
----
 xccui commented 12 hours ago

 

@HeartSaVioR Yes. It seems the output mode option was mainly designed for 
stateful aggregations, which means it actually works in a restricted way.

Ideally, to support complete mode, all the operators must be capable of 
outputting the "complete" result seen so far for each epoch. Personally, I'm in 
favor of removing this mode in a future version. But for now, I propose to add 
more restrictions while doing the plan check (e.g., disallowing the union 
situation you mentioned) and also a note to the documentation.

IMO, the mode of the result table should only be decided by the operators in 
the plan and it could either be "append" or "update" (including the current 
"complete" mode). Basically, the designated sink should match the mode of the 
result table. Usually, supporting "update" needs more effort and that means 
only part of the sinks could be chosen for a plan containing an aggregation or 
some kind of joins.
----
HeartSaVioR commented 5 hours ago

 

I'm curious how SS committers think about these comments upon - if they agree 
about the comments then the issue is rather about the design issue of streaming 
output mode, and I think the right way to fix is decoupling streaming output 
mode with sink.

Would it break backward compatibility? If you look back branch-2.4, you'll find 
it very surprised that most built-in sink implementations "ignore" the output 
mode. The output mode provided by StreamWriteSupport.createStreamWriter is not 
used at all, with only one exception.

The only exception for built-in sink is memory sink, and it doesn't deal with 
the difference between append mode and update mode. It's only used to truncate 
the memory for complete mode. Given that the sink exists most likely for 
testing, it only helps to make tests be easier to implement, nothing else. Also 
I'm strongly in favor of dropping complete mode, as I already provided data 
loss issue, and I don't think it's production-wise.

I don't even think custom sinks have been respecting the output mode, as the 
API lacks of information on update mode, and complete mode is not 
production-wise.

The major problem is the time. I totally understand such change may feel a bit 
huge to go with the release which has already done in RC1 though...

I hope we address it in the major release (that's a good rationalization), but 
if we really want to minimize the changes for now, what about adding 
SupportsStreamingTruncate as internal trait as well, so that we avoid coupling 
SupportTruncate with complete mode and let streaming write go different path 
with the batch one?
 (Even better if we simply don't support complete mode or restrict to private 
right now, but...)

As both SupportsStreamingUpdate and SupportsStreamingTruncate would be internal 
one, we will have time to revisit the streaming path and change without making 
effect of public API.

That would make custom sinks only be able to append as we won't expose these 
abilities, but that's what I expect so far. Even with the new DSv2 implementing 
truncate in streaming sink looks to be very limited, as truncation should take 
place when committing, which means write tasks cannot write to the destination 
directly, not scalable.

Does my proposal make sense?

cc. to @tdas @zsxwing @jose-torres @brkyvz @jerryshao @gaborgsomogyi to hear 
their voices as well. Please cc. to more ppl if you have anyone to get some 
help taking a look at.

 


was (Author: xccui):
Copy some comments from [https://github.com/apache/spark/pull/28523]
----
xccui commented 23 hours ago

 

Hi all, I am not quite familiar with the version history of the streaming sink, 
but would like to share pieces of my thought here. Please correct me if I 
misunderstood.

I think SupportsStreamingUpdate should NOT be a sink specific feature. We 
concentrate on sink now because the current SS implementation doesn't allow 
chaining operators (other than the sink) which produce updating results.
 The only reason we provide update mode should be to produce the "correct" 
result table, i.e., to make the result table identical with the one produced by 
applying the same query on the materialized input rows.
 The semantics of update mode we provided lacks deleting support, which makes 
it unreliable sometimes. I suppose we all agree on offering a better design in 
the future. But for now, it's better to keep it unchanged with previous 
versions (and also with the lowest risk and efforts).
 I'll try to make some improvements to the related issues.
----
HeartSaVioR commented 17 hours ago

 

Thanks for the great input, @xccui.

Basically I agree with your input - that's the same as my understanding as I 
commented before (#28523 (comment)).

To summarize my previous comment, I also don't know how the streaming output 
mode was designed, but from my understanding it's effective only on result 
table for stateful aggregation operators. It's not even applied for all 
stateful operators, e.g. the mode doesn't affect stream-stream join. It doesn't 
guarantee the final output is respecting the semantic, and then there's no 
meaning of applying the same on the sink side.

Another concern comes into my mind is complete mode. The complete mode is also 
effective on the result table. It may sound making sense to support complete 
mode in sink as truncate and insert, but it leads to data loss for the case the 
result table is being union to other stream which is not creating "result 
table". (I haven't had such query but it's technically possible.) The complete 
mode will not care about the other stream and in every batch the previous 
output from the other stream will be lost. I think complete mode is weird one 
for streaming and better to discontinue supporting; I wouldn't expect any 
production query to use this mode, but please let me know if there is.

Anyway I think the streaming update mode technically doesn't couple with the 
availability of sink. It should be left as it is, though we'll probably have to 
fix guide doc as the guide doc says it's for result table "as well as" for the 
sink. Description of the streaming output mode in sink should be corrected as 
well - they're not dependent on streaming output mode, and as of now only 
append is possible.

ps. We may need to revisit the operators and streaming output modes to see any 
flaw, similarly I went through via discussion thread and #24890. One thing 
would be flatMapGroupsWithState with append mode.
----
 
 xccui commented 12 hours ago

 

@HeartSaVioR Yes. It seems the output mode option was mainly designed for 
stateful aggregations, which means it actually works in a restricted way.

Ideally, to support complete mode, all the operators must be capable of 
outputting the "complete" result seen so far for each epoch. Personally, I'm in 
favor of removing this mode in a future version. But for now, I propose to add 
more restrictions while doing the plan check (e.g., disallowing the union 
situation you mentioned) and also a note to the documentation.

IMO, the mode of the result table should only be decided by the operators in 
the plan and it could either be "append" or "update" (including the current 
"complete" mode). Basically, the designated sink should match the mode of the 
result table. Usually, supporting "update" needs more effort and that means 
only part of the sinks could be chosen for a plan containing an aggregation or 
some kind of joins.
----
HeartSaVioR commented 5 hours ago


 I'm curious how SS committers think about these comments upon - if they agree 
about the comments then the issue is rather about the design issue of streaming 
output mode, and I think the right way to fix is decoupling streaming output 
mode with sink.

Would it break backward compatibility? If you look back branch-2.4, you'll find 
it very surprised that most built-in sink implementations "ignore" the output 
mode. The output mode provided by StreamWriteSupport.createStreamWriter is not 
used at all, with only one exception.

The only exception for built-in sink is memory sink, and it doesn't deal with 
the difference between append mode and update mode. It's only used to truncate 
the memory for complete mode. Given that the sink exists most likely for 
testing, it only helps to make tests be easier to implement, nothing else. Also 
I'm strongly in favor of dropping complete mode, as I already provided data 
loss issue, and I don't think it's production-wise.

I don't even think custom sinks have been respecting the output mode, as the 
API lacks of information on update mode, and complete mode is not 
production-wise.

The major problem is the time. I totally understand such change may feel a bit 
huge to go with the release which has already done in RC1 though...

I hope we address it in the major release (that's a good rationalization), but 
if we really want to minimize the changes for now, what about adding 
SupportsStreamingTruncate as internal trait as well, so that we avoid coupling 
SupportTruncate with complete mode and let streaming write go different path 
with the batch one?
 (Even better if we simply don't support complete mode or restrict to private 
right now, but...)

As both SupportsStreamingUpdate and SupportsStreamingTruncate would be internal 
one, we will have time to revisit the streaming path and change without making 
effect of public API.

That would make custom sinks only be able to append as we won't expose these 
abilities, but that's what I expect so far. Even with the new DSv2 implementing 
truncate in streaming sink looks to be very limited, as truncation should take 
place when committing, which means write tasks cannot write to the destination 
directly, not scalable.

Does my proposal make sense?

cc. to @tdas @zsxwing @jose-torres @brkyvz @jerryshao @gaborgsomogyi to hear 
their voices as well. Please cc. to more ppl if you have anyone to get some 
help taking a look at.

 

> Improve the output mode for structured streaming
> ------------------------------------------------
>
>                 Key: SPARK-31724
>                 URL: https://issues.apache.org/jira/browse/SPARK-31724
>             Project: Spark
>          Issue Type: Umbrella
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Xingcan Cui
>            Priority: Major
>
> The current design of output mode in structured streaming is restricted and 
> needs some improvements. This umbrella issue is used to track all the updates 
> we are going to make.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to