[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-20 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464907821
 
 
   @tdas 
   Now I'm also thinking about how to add matched flag to 
KeyWithIndexToValueStateStore, but fail to think how it is compatible with 
existing state, specifically, regarding optional matched flag.
   
   Does it mean you're suggesting not to version state format for streaming 
join? Situation would be pretty different which we choose.
   
   1) Keep state format as 1
   
   I'm not sure this is safe to guarantee accessibility of `matched` flag, 
given that state will be mixed up with existing rows and new rows which some 
don't have `matched` field (numFields also doesn't count `matched` field) and 
some have `matched` field. Even if this can be possible via specific trick 
(knowledge of internal), I'd like to address this without breaking any 
interface/expectation cause it could be a blocker when we want to change 
InternalRow. (Like allowing different schema of actual rows - this could occur 
crash on runtime.)
   
   Please shed a light (sketched idea) on how to do it if you would want to 
guide me here.
   
   2) Have two state formats
   
   I think refactoring is still necessary if we version state format (we could 
flatten some interface and implementations into one though), cause we will end 
up having two StreamingJoinStateManagers which most of things are duplicated 
but implementation of KeyWithIndexToValueStore will be a bit different. If we 
don't add interface but only add boolean flag to classify state formats we 
would end up having code regarding state format being mixed up with logic, 
complicated and hard to debug.
   
   In overall I totally understand the importance about reducing the code diff, 
but I'd hope we'd not too concerned about large code diff if we find its worth 
on introducing more codes. (I meant I'd hope the amount of code change would 
not be the first thing to consider.) To evaluate the worth we might be ideal to 
put aside of length of code diff when deciding general direction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464929292
 
 
   I've addressed changing approach from having 3rd state store to adding 
`matched` field to 2nd state store.
   
   Please take a look at last commit I made to change the approach: 
https://github.com/apache/spark/pull/23634/commits/912a25023c86183a653c52e0fd9aef5651a6ad3d
   
   Though it didn't roll back refactoring I've done for state versioning, it 
would share a view to determine the difference between 3rd state store vs 
matched field in 2nd store. (Some projections were necessary to get original 
row from value side.)
   
   Honestly, this commit represents the rationalization why refactoring was 
necessary. Less than 200 lines (add + remove) needed for applying suggestion on 
new approach, while it keeps versioning for state formats. The change is done 
only in state handler side, and doesn't touch any part of join logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464929292
 
 
   I've addressed changing approach from having 3rd state store to adding 
`matched` field to 2nd state store.
   
   Please take a look at last commit I made to change the approach: 
https://github.com/apache/spark/pull/23634/commits/912a25023c86183a653c52e0fd9aef5651a6ad3d
   
   Though it didn't roll back refactoring I've done for state versioning, it 
would share a view to determine the difference between 3rd state store vs 
matched field in 2nd store.
   
   Honestly, this commit represents the rationalization why refactoring was 
necessary. Less than 200 lines (add + remove) needed for applying suggestion on 
new approach, as well as keeping versioning for state formats. The change is 
done only in state handler side, and doesn't touch any part of join logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464929292
 
 
   I've addressed changing approach from having 3rd state store to adding 
`matched` field to 2nd state store.
   
   Please take a look at last commit I made to change the approach: 
https://github.com/apache/spark/pull/23634/commits/912a25023c86183a653c52e0fd9aef5651a6ad3d
   
   Though it didn't roll back refactoring I've done for state versioning, it 
would share a view to determine the difference between 3rd state store vs 
matched field in 2nd store.
   
   Honestly, this commit represents the rationalization why refactoring was 
necessary. Less than 200 lines (add + remove) needed for applying suggestion on 
new approach, while it keeps versioning for state formats. The change is done 
only in state handler side, and doesn't touch any part of join logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464929292
 
 
   I've addressed changing approach from having 3rd state store to adding 
`matched` field to 2nd state store.
   
   Please take a look at last commit I made to change the approach: 
https://github.com/apache/spark/pull/23634/commits/912a25023c86183a653c52e0fd9aef5651a6ad3d
   
   Though it didn't roll back refactoring I've done for state versioning, it 
would share a view to determine the difference between 3rd state store vs 
matched field in 2nd store.
   
   Honestly, this commit represents the rationalization why refactoring was 
necessary. Less than 200 lines needed for applying suggestion on new approach, 
as well as keeping versioning for state formats. The change is done only in 
state handler side, and doesn't touch any part of join logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464929292
 
 
   I've addressed changing approach from having 3rd state store to adding 
`matched` field to 2nd state store.
   
   Please take a look at last commit I made to change the approach: 
https://github.com/apache/spark/pull/23634/commits/912a25023c86183a653c52e0fd9aef5651a6ad3d
   
   Though it didn't roll back refactoring I've done for state versioning, it 
would share a view to determine the difference between 3rd state store vs 
matched field in 2nd store.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464911031
 
 
   > So it actually makes sense to add the support for matched flag as a purely 
optional feature within the existing SymmetricHashJoinStateManager.
   
   I would still suggest to have matched flag at any chance: suppose we don't 
touch anything but change the query from inner join to left outer join - then 
theoretically the result could be incorrect because we don't record `matched` 
field and all rows in left state can be emitted as right-null even they've 
matched before. If I'm not missing here, the change of query is allowed, or 
even disallowed we don't fail the query before it gets processed so it would 
incur no error but incorrect result, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464907821
 
 
   @tdas 
   Now I'm also thinking about how to add matched flag to 
KeyWithIndexToValueStateStore, but fail to think how it is compatible with 
existing state, specifically, regarding optional matched flag.
   
   Does it mean you're suggesting not to version state format for streaming 
join? Situation would be pretty different which we choose.
   
   1) Keep state format as 1
   
   I'm not sure this is safe to guarantee accessibility of `matched` flag, 
given that state will be mixed up with existing rows and new rows which some 
don't have `matched` field (numFields also doesn't count `matched` field) and 
some have `matched` field. Even if this can be possible via specific trick 
(knowledge of internal), I'd like to address this without breaking any 
interface/expectation cause it could be a blocker when we want to change 
InternalRow. (Like allowing different schema of actual rows - this could occur 
crash on runtime.)
   
   Please shed a light (sketched idea) on how to do it if your would want to 
guide me here.
   
   2) Have two state formats
   
   I think refactoring is still necessary if we version state format (we could 
flatten some interface and implementations into one though), cause we will end 
up having two StreamingJoinStateManagers which most of things are duplicated 
but implementation of KeyWithIndexToValueStore will be a bit different. If we 
don't add interface but only add boolean flag to classify state formats we 
would end up having code regarding state format being mixed up with logic, 
complicated and hard to debug.
   
   In overall I totally understand the importance about reducing the code diff, 
but I'd hope we'd not too concerned about large code diff if we find its worth 
on introducing more codes. (I meant I'd hope the amount of code change would 
not be the first thing to consider.) To evaluate the worth we might be ideal to 
put aside of length of code diff when deciding general direction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-18 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464907821
 
 
   @tdas 
   Now I'm also thinking about how to add matched flag to 
KeyWithIndexToValueStateStore, but fail to think how it is compatible with 
existing state, specifically, regarding optional matched flag.
   
   Does it mean you're suggesting not to version state format for streaming 
join? Situation would be pretty different which we choose.
   
   1) Keep state format as 1
   
   I'm not sure this is safe to guarantee accessibility of `matched` flag, 
given that state will be mixed up with existing rows and new rows which some 
don't have `matched` field (numFields also doesn't count `matched` field) and 
some have `matched` field. Even if this can be possible via specific trick 
(knowledge of internal), I'd like to address this without breaking any 
interface/expectation cause it could be a blocker when we want to change 
InternalRow. (Like allowing different schema of actual rows - this could occur 
crash on runtime.)
   
   Please shed a light (sketched idea) on how to do it if your would want to 
guide me here.
   
   2) Have two state formats
   
   I think refactoring is still necessary if we version state format (we could 
flatten some interface and implementations into one though), cause we will end 
up having two StreamingJoinStateManagers which most of things are duplicated 
but implementation of KeyWithIndexToValueStore will be a bit different. If we 
don't add interface but only add boolean flag to classify state formats we 
would end up having code regarding state format being mixed up with logic, 
complicated and hard to debug.
   
   In overall I totally understand the importance about reducing the code diff, 
but I'd hope we're not too concerned about large code diff if we find its worth 
on introducing more codes. To evaluate the worth we might be ideal to put aside 
of length of code diff when deciding general direction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-02-17 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-464613135
 
 
   @tdas 
   I may need to get some numbers to back up my idea, but let me explain the 
rationalization first.
   
   Lesson learned from my previous work #21733 was reducing the size of diff on 
state per batch performs better (size and time) in spite of needs on additional 
projection. I considered both approaches: 1) add boolean flag to current index 
to row 2) add a new state store to only store boolean flag. If we compare both 
approach via state size, we can expect below:
   
   * approach 1) requires change of state by `value + boolean flag` (key 
doesn't need to be stored again)
   * approach 2) requires change of state by `key + boolean flag` (value 
doesn't need to be stored again)
   
   Given that we store the row as it is for value part, most of the times 
`value + boolean flag` would be bigger than `key + boolean flag` (since value 
may also have part or full of key) which would make me think we can take 
approach 2) to gain state optimization with adding some complexity of state 
codebase. 
   (Having one state store has another non-trivial overhead so I would not say 
approach 2) is 100% superior to approach 1). That may need to be explored if we 
would like to see the numbers.)
   
   Suppose we take approach 2), refactor of codebase is necessary to reduce 
huge code duplication: current implementation doesn't seem to have 
extensibility - and the change refactors the code to try best to reduce code 
duplication whereas same code can be used to two places.
   
   I would be happy to take approach 1) in this PR and experiment about 
approach 2) later if we doubt about its benefit. Please let me know which one 
we would prefer. Thanks in advance!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-31 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-459299774
 
 
   Just applied the approach 2 and removed `[WIP]`. Also added the UT which 
restore the query from previous state (2.4.0).
   
   The code may have some redundant parts: not only state format but also the 
logic have to be versioned so it's not trivial to abstract them all. I'm 
planning to reduce duplication, but I'd be really appreciated someone could 
help me to find out where and how to clean up the code, or even restructure 
packages/traits/classes.
   
   Thanks in advance for all reviewers!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-27 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-458000808
 
 
   > The rows in one of the inputs are immediately discarded while I think both 
inputs should be retained till the watermark of the operator advances so that 
the correct results can be produced.
   
   The rows were stored to states in both sides in batch 1: they just evicted 
from states at different batches (batch 3 for left side, batch 2 for right 
side), which global watermark was advanced in batch 2, so the right-side 
eviction from batch 2 was valid under the watermark condition. Left-side "late" 
eviction is just due to join condition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-25 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457789591
 
 
   > In update mode it may be ok to emit null value for one side and later when 
the matching events arrive on the other side the new rows be re-emitted.
   
   In this case it produces incorrect result, because matched row will be 
emitted first, and null-matched row will be emitted later, which may 
"overwrite" the result and treat the final result as null-matched.
   
   IMHO regardless of modes, the final result should be same per key, same 
result between batch and streaming. Suppose the query is running as a batch 
query, then null-matched row will never be produced. So I'm not sure this is 
related to output mode.
   
   > So it seems that there is two different watermarks here (one for each 
input) which seems wrong. Ideally the watermark should be tied to the operator 
(join) and not separate watermarks for each input so that the operator can 
compute the result based on its watermark.
   
   As I commented earlier to Jose, watermark for late tuple is same across 
operators. The difference is when to evict rows in state, which I guess it 
could be according to join condition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-24 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457405492
 
 
   > I think it's incorrect to evict R1 if we aren't also evicting all rows in 
L which R1 could have matched with.
   
   My understanding for storing rows in state is for matching against new rows, 
not for concerning previous status. If we have to keep rows in right side due 
to joined left side rows are not evicted (maybe end up with also waiting for 5 
seconds in above case), we end up unnecessary storing rows for right side 
(which cannot be matched against new rows in left side) which can be just 
replaced with matched flag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-24 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457405492
 
 
   > I think it's incorrect to evict R1 if we aren't also evicting all rows in 
L which R1 could have matched with.
   
   My understanding for storing rows in state is for matching against new rows, 
not for concerning previous status. If we have to keep rows in right side due 
to joined left side rows are not evicted (may end up with also waiting for 5 
seconds in above case), we end up unnecessary storing rows for right side 
(which cannot be matched against new rows in left side) which can be just 
replaced with matched flag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-24 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457405492
 
 
   > I think it's incorrect to evict R1 if we aren't also evicting all rows in 
L which R1 could have matched with.
   
   My understanding for storing rows in state is for matching against new rows, 
not for concerning previous status. If we have to keep rows in right side due 
to joined left side rows are not evicted, we end up unnecessary storing rows 
for right side (which cannot be matched against new rows in left side) which 
can be just replaced with matched flag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-24 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457405492
 
 
   > I think it's incorrect to evict R1 if we aren't also evicting all rows in 
L which R1 could have matched with.
   
   My understanding for storing rows in state is for matching against new rows, 
not for concerning previous status. If we have to keep rows in right side due 
to joined left side rows are not evicted, we end up unnecessary storing rows 
for right side which can be just replaced with matched flag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

2019-01-24 Thread GitBox
HeartSaVioR edited a comment on issue #23634: [SPARK-26154][SS] Streaming 
left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/23634#issuecomment-457354283
 
 
   Seems like I need to add the query along with edge-case. (Just updated the 
description.) 
   
   > the left side of the self-join is supposed to evict records 5 seconds 
behind the watermark, but it seems to be incorrectly waiting 10 second instead.
   
   No, left side waited for 5 seconds behind the watermark, whereas right side 
didn't wait behind the watermark. The join condition is not `ts1 = ts2`, but 
`ts1 <= ts2 <= ts1 + interval 5 seconds`, and in this case this is I guess 
"known behavior" that left side is expected to wait up to 5 seconds behind the 
watermark to match like `ts2 = ts1 + 4 seconds` (equals to `ts1 = ts2 - 4 
seconds`).
   
   Here's a physical plan from one of batch while running similar query of UT 
in spark-shell:
   
   ```
   == Physical Plan ==
   WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@758c98d2
   +- StreamingSymmetricHashJoin [fooId#4L], [barId#8L], LeftOuter, condition = 
[ leftOnly = null, rightOnly = null, both = ((barTime#9-T5000ms >= 
fooTime#5-T5000ms) && (barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 
seconds)), full = ((barTime#9-T5000ms >= fooTime#5-T5000ms) && 
(barTime#9-T5000ms <= fooTime#5-T5000ms + interval 5 seconds)) ], state info [ 
checkpoint = 
file:/private/var/folders/wn/3hpqx8015hjbmq43hmrw78z4gn/T/temporary-455ca2eb-f4d4-44af-a8f5-5a90d30b520c/state,
 runId = fdc8a843-021f-4029-8be4-83f480780c43, opId = 0, ver = 2, numPartitions 
= 200], 1548219494841, state cleanup [ left value predicate: (fooTime#5-T5000ms 
<= 154821948984), right value predicate: (barTime#9-T5000ms <= 
154821949484) ]
  :- Exchange hashpartitioning(fooId#4L, 200)
  :  +- EventTimeWatermark fooTime#5: timestamp, interval 5 seconds
  : +- *(1) Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
  :+- *(1) Project [timestamp#0, value#1L]
  :   +- *(1) ScanV2[timestamp#0, value#1L] class 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2
  +- Exchange hashpartitioning(barId#8L, 200)
 +- *(3) Filter isnotnull(barTime#9-T5000ms)
+- EventTimeWatermark barTime#9: timestamp, interval 5 seconds
   +- *(2) Project [value#1L AS barId#8L, timestamp#0 AS barTime#9]
  +- *(2) Filter (isnotnull(value#1L) && ((value#1L % 2) = 0))
 +- *(2) Project [timestamp#0, value#1L]
+- *(2) ScanV2[timestamp#0, value#1L] class 
org.apache.spark.sql.execution.streaming.sources.RateStreamTable$$anon$1$$anon$2
   ```
   
   cropped join information which we only want:
   
   ```
   1548219494841, 
   state cleanup [ 
 left value predicate: (fooTime#5-T5000ms <= 154821948984), 
 right value predicate: (barTime#9-T5000ms <= 154821949484) 
   ]
   ```
   
   event time watermark = '1548219494841' (2019/01/23 13:58:14.841 GMT+09:00)
   left predicate   = '154821948984' (2019/01/23 13:58:09.840 
GMT+09:00)
   right predicate = '154821949484' (2019/01/23 13:58:14.840 
GMT+09:00)
   
   > If L1 isn't evicted, that means a new row L1' should still be able to 
match with R1, and therefore R1 can't be evicted either.
   
   I think state eviction behind the watermark is due to wait for new row, 
which other conditions (like previously joined rows) should not matter. So we 
may need to focus on watermark itself.
   
   Suppose `L1.ts = R1.ts = L1'.ts` and R1 is evicted, then this represents 
`watermark > R1.ts (= L1'.ts)` and L1' will be dropped without joining due to 
watermark. Once watermark passes, right side doesn't have a chance to match 
against left side via join condition (whereas left side still has a chance to 
match against right side), so it looks correct to evict R1.
   
   Please let me know if I'm missing here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org