[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518 Yes, I think we are on the same page now. I would focus on `b, c` in this PR, and launch separate PRs for the other cases future. Especially for `DataConnectionException` we might add the retry mechanism if possible during connecting the server and could throw `PartitionNotFoundException` or `DataConnectionException` after retry fails. I already updated the codes in two aspects: - Not transform the received `PartitionNotFoundException` on consumer side. - Send the `PartitionNotFoundException` on producer side in the process of creating reader view. If the `ResultPartition` is not removed from `ResultPartitionManager`, only the `SpillableSubpartition#createView` might throw `IOException` which could indicate the required data file not open correctly. So I wrap the `PartitionNotFoundException` in upper layer which seems unified for all the `ResultSubpartition` instances besides the new `BoundedBlockingSubpartition` stephan proposed. After you confirm this way is correct, then I would add new unit tests for covering these cases. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255 Yes, the blacklist could solve the hardware problems future. As for `d`, you are right that the `IOException` might be caused by multiple factors, then we do not cover it in this PR. As for `f`, if I understand correctly, the scenario is similar as the case `a` I mentioned above. If the TM is unreachable, the first phase of establishing connection would fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it might be caused by network temporary problem. After retrying the connection later it might be success. In other words, from the `ConnectionTimeoutException` or `ConnectionRefusedException` received by consumer, it could not estimate whether the producer TM is really lost or the temporary network issue. After the connection established, the second phase is to send logic `PartitionRequest` message, if at this time the producer TM is unreachable to cause any network exceptions, we still can not distinguish whether it is caused by TM really lost or temporary network issue, only if the consumer receives specific `PartitionNotFoundException` via network which is the cases of b, c. So we might only focus on b, c which are determined currently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255 Yes, the blacklist could solve the hardware problems future. As for `d`, you are right that the `IOException` might be caused by multiple factors, then we do not cover it in this PR. As for `f`, if I understand correctly, the scenario is similar as the case `a` I mentioned above. If the TM is unreachable, the first phase of establishing connection would fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it might be caused by network temporary problem. After retrying the connection later it might be success. In other words, from the `ConnectionTimeoutException` or `ConnectionRefusedException` received by consumer, it could not estimate whether the producer TM is really lost or the temporary network issue. After the connection established, the second phase is to send logic `PartitionRequest` message, if at this time the producer TM is unreachable to cause any network exceptions, we still can not distinguish whether it is caused by TM really lost or temporary network issue, only if the consumer receives specific `PartitionNotFoundException` via network which is the cases of b, c. So we might only focus on b, c currently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309 @tillrohrmann thanks for the further suggestions! I agree with your overall ideas. ->What kind of environment/hardware issues do you have in mind that could cause repeated failures of reading the result data? I mean the disk/network hardware problems on producer side which could not be restored in short time. So it is better to restart the producer in another machine. We ever encountered this corner case in production. -> Did I understand you correctly, that the `PartitionNotFoundException` is good enough and, thus, we don't need to introduce a new exception? The information in current `PartitionNotFoundException` is enough for `JobMaster` restarting the producer, but it can not cover all the cases. So I would like to list all the possible cases to confirm with you firstly: - a. Tcp connection fail: it might be caused by producer TM lost or network hardware issue. We might introduce `DataConnectionException` for this. - b. `PartitionNotFound`: `ResultPartition` is released from `ResultPartitionManager` which needs to restart producer immediately. - c. `ResultSupartition#createReaderView` throw `IOException`: it might be caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could also be wrapped into existing `PartitionNotFound`. - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the reason is the same as above c. `PartitionNotFound` might also be used here. - e. Network server exception during transferring data: it seems more complicated here. The reason might be caused by producer TM lost, or temporary network problem or server hardware environment issue, etc. The consumer as client might be sensitive via inactive network channel or `ErrorResponse` from server. We could introduce `DataTransferException` for covering all these. The above {b, c, d} might be determined to restart producer immediately and the current `PartitionNotFound` could be used for covering them. For the cases of {a, e}, we might introduce new exceptions to cover them and failover strategy might have different rules for considering them. So I think there might have two options: - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to define an abstract `DataConsumptionException` as parent of above `PartitionNotFoundException`, `DataConnectionException` and `DataTransferException`. - Or we only concern on {b, c, d} in the first step ({a, e} might be considered if necessary future or in other ways), then the current `PartitionNotFound` is enough and no need new exceptions ATM. Both two options make sense for me, so I would like to take your final opinion or you have other options. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309 @tillrohrmann thanks for the further suggestions! I agree with your overall ideas. ->What kind of environment/hardware issues do you have in mind that could cause repeated failures of reading the result data? I mean the disk/network hardware problems on producer side which could not be restored in short time. So it is better to restart the producer in another machine. We ever encountered this corner case in production. -> Did I understand you correctly, that the `PartitionNotFoundException` is good enough and, thus, we don't need to introduce a new exception? The information in current `PartitionNotFoundException` is enough for `JobMaster` restarting the producer, but it can not cover all the cases. So I would like to list all the possible cases to confirm with you firstly: - a. Tcp connection fail: it might be caused by producer TM lost or network hardware issue. We might introduce `DataConnectionException` for this. - b. `PartitionNotFound`: `ResultPartition` is released from `ResultPartitionManager` which needs to restart producer immediately. - c. `ResultSupartition#createReaderView` throw `IOException`: it might be caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could also be wrapped into existing `PartitionNotFound`. - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the reason is the same as above c. `PartitionNotFound` might also be used here. - e. Network server exception during transferring data: it seems more complicated here. The reason might be caused by producer TM lost, or temporary network problem or server hardware environment issue, etc. The consumer as client might be sensitive via inactive network channel or `ErrorResponse` from server. We could introduce `DataTransferException` for covering all these. The above {b, c, d} might be determined to restart producer immediately and the current `PartitionNotFound` could be used for covering them. For the cases of a and e, we might introduce new exceptions to cover them and failover strategy might have different rules for considering them. So I think there are two options for considering: - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to define an abstract `DataConsumptionException` as parent of above `PartitionNotFoundException`, `DataConnectionException` and `DataTransferException`. - Or we only concern on {b, c, d} in the first step (a and e might be done future or in other ways), then the current `PartitionNotFound` is enough and no need new exceptions ATM. Both two options make sense for me, so I would like to take your final opinion or you have other options. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309 @tillrohrmann thanks for the further suggestions! I agree with your overall ideas. ->What kind of environment/hardware issues do you have in mind that could cause repeated failures of reading the result data? I mean the disk/network hardware problems on producer side which could not be restored in short time. So it is better to restart the producer in another machine. We ever encountered this corner case in production. -> Did I understand you correctly, that the `PartitionNotFoundException` is good enough and, thus, we don't need to introduce a new exception? The information in current `PartitionNotFoundException` is enough for `JobMaster` restarting the producer, but it can not cover all the cases. So I would like to list all the possible cases to confirm with you firstly: - a. Tcp connection fail: it might be caused by producer TM lost or network hardware issue. We might introduce `DataConnectionException` for this. - b. `PartitionNotFound`: `ResultPartition` is released from `ResultPartitionManager` which needs to restart producer immediately. - c. `ResultSupartition#createReaderView` throw `IOException: it might be caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could also be wrapped into existing `PartitionNotFound`. - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the reason is the same as above c. `PartitionNotFound` might also be used here. - e. Network server exception during transferring data: it seems more complicated here. The reason might be caused by producer TM lost, or temporary network problem or server hardware environment issue, etc. The consumer as client might be sensitive via inactive network channel or `ErrorResponse` from server. We could introduce `DataTransferException` for covering all these. The above {b, c, d} might be determined to restart producer immediately and the current `PartitionNotFound` could be used for covering them. For the cases of a and e, we might introduce new exceptions to cover them and failover strategy might have different rules for considering them. So I think there are two options for considering: - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to define an abstract `DataConsumptionException` as parent of above `PartitionNotFoundException`, `DataConnectionException` and `DataTransferException`. - Or we only concern on {b, c, d} in the first step (a and e might be done future or in other ways), then the current `PartitionNotFound` is enough and no need new exceptions ATM. Both two options make sense for me, so I would like to take your final opinion or you have other options. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services