[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes in two aspects:
   
   -  Not transform the received `PartitionNotFoundException` on consumer side.
   
   - Send the `PartitionNotFoundException` on producer side in the process of 
creating reader view. If the `ResultPartition` is not removed from 
`ResultPartitionManager`, only the `SpillableSubpartition#createView` might 
throw `IOException` which could indicate the required data file not open 
correctly. So I wrap the `PartitionNotFoundException` in upper layer which 
seems unified for all the `ResultSubpartition` instances besides the  new 
`BoundedBlockingSubpartition` stephan proposed.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255
 
 
   Yes, the blacklist could solve the hardware problems future.
   
   As for `d`, you are right that the `IOException` might be caused by multiple 
factors, then we do not cover it in this PR.
   
   As for `f`, if I understand correctly, the scenario is similar as the case 
`a` I mentioned above. 
   If the TM is unreachable, the first phase of establishing connection would 
fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I 
proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it 
might be caused by network temporary problem. After retrying the connection 
later it might be success. 
   
   In other words, from the `ConnectionTimeoutException` or 
`ConnectionRefusedException` received by consumer, it could not estimate 
whether the producer TM is really lost or the temporary network issue. 
   
   After the connection established, the second phase is to send logic 
`PartitionRequest` message, if at this time the producer TM is unreachable to 
cause any network exceptions, we still can not distinguish whether it is caused 
by TM really lost or temporary network issue, only if the consumer receives 
specific `PartitionNotFoundException` via network which is the cases of b, c.
   
   So we might only focus on b, c which are determined currently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255
 
 
   Yes, the blacklist could solve the hardware problems future.
   
   As for `d`, you are right that the `IOException` might be caused by multiple 
factors, then we do not cover it in this PR.
   
   As for `f`, if I understand correctly, the scenario is similar as the case 
`a` I mentioned above. 
   If the TM is unreachable, the first phase of establishing connection would 
fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I 
proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it 
might be caused by network temporary problem. After retrying the connection 
later it might be success. 
   
   In other words, from the `ConnectionTimeoutException` or 
`ConnectionRefusedException` received by consumer, it could not estimate 
whether the producer TM is really lost or the temporary network issue. 
   
   After the connection established, the second phase is to send logic 
`PartitionRequest` message, if at this time the producer TM is unreachable to 
cause any network exceptions, we still can not distinguish whether it is caused 
by TM really lost or temporary network issue, only if the consumer receives 
specific `PartitionNotFoundException` via network which is the cases of b, c.
   
   So we might only focus on b, c currently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException`: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of {a, e}, we might introduce new exceptions to cover them and 
failover strategy might have different rules for considering them.
   
   So I think there might have two options: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step ({a, e} might be 
considered if necessary future or in other ways), then the current 
`PartitionNotFound` is enough and no need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException`: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the `PartitionNotFoundException` is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. `PartitionNotFound`: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services