akalash commented on pull request #17238:
URL: https://github.com/apache/flink/pull/17238#issuecomment-921681480


   > I think technically it knows if upstream is ready or not, because retrying 
logic is on the downstream's RemoteInputChannel.
   
   I don't see that. The retying logic just sends the message blindly and it 
hopes that it will be successful. But if the request fails the downstream 
asynchronously after some time will receive the error message and schedule the 
resending again. So if I didn't miss something then I can not say that  `if we 
don't receive an error message for some time then the upstream highly likely is 
ready` is a good knowledge about readiness the upstream
   
   > But in my idea downstream doesn't necessarily needs to know if the new 
buffer size announcement worked or not. If we can assume that the only way how 
new buffer size announcement can be overtaken by a partition request is if 
previous partition request failed and we are re-trying, then we can just cache 
the latest buffer size in the RemoteInputChannel and attach it to the 
PartitionRequest retry message.
   
   I see possible datarace here:
   
   - Sending PartitionRequest
   - Checking that there is no error
   - Sending new buffer size
   - Receiving the error for PartitionRequest
   
   in this case, we don't cache the value because there was no error. It means 
we should always cache the value before the sending:
   
   - Sending PartitionRequest
   - Caching the new buffer size
   - Receiving the error for PartitionRequest
   - Reading the cached buffer size for sending in PartitionRequst
   - Caching the new buffer size with new value
   - Sending new buffer size(this request fails)
   - Only now sending PartitionRequst with the old value
   
   in this case, the new value would be lost. So this means that we need to add 
several synchronizations here since the sending happens from different threads. 
it is possible to do so but it also doesn't look like the cleanest solution for 
me. I will think about it a little more but I still think that the cleanest 
solution is to understand somehow which channel accepted the result. We can do 
it as I described above or we can react to a new error similarly to 
PartitionRequest works.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to