[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-491760194 Great, I have no other concerns now, then I could continue on this PR. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-491258890 @tillrohrmann thanks for your further great suggestions! I totally agree the above points and the new exception category would make the semantics more clearly. To double confirm If my understanding is correct: - The new proposed `PartitionException` has three sub-type instances as `PartitionNotFoundException`, `PartitionCorruptionException` and `PartitionLookupException`. The job master would check the `PartitionException` for determining to restart the producer. Especially for instance of `PartitionLookupException`, the producer might need to be restarted in another machine. - Considering the current `SpilledSubpartitionView` would be replace by Stephan's new `BoundedBlockingSubpartition`, do you think it is necessary to throw `PartitionCorruptionException` in current code or focus on it after `BoundedBlockingSubpartition` is merged? - The producer could send `PartitionException` if fails to create sub partition view. On consumer side, if the exception is instanceof `PartitionNotFoundException`, it would keep the current logic of retrying to request partition. If instanceof `PartitionCorruptionException`, the consumer could fail directly. - I already created a separate jira for covering `PartitionLookupException`, and the connection lost after established would not be considered in this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490529494 Thanks for the further review @tillrohrmann . Let me try to explain above two concerns. - Whether to always transform `IOException` to `PartitionNotFoundException`? Actually I would like to do this wrapper in the construtor of `SpilledSubpartitionView` which might cause `IOException` during opening disk file. And based on the current codes, this is the only process which might cause `IOExeption` during creating `ResultSubpartitionView`. But considering the `SpillableSubpartitoin` and its reader view would be removed by stephan's new `BoundedBlockingSubpartition` soon, so I do this wrapper as the way in PR which might also suitable for the new `BoundedBlockingSubpartition`. But in strict way it would be better to transform in the specific process instead of covering the whole process. If you have concerns on this, I could adjust it to the internal process even though it would be removed soon. Or we do not focus on the case `c` until `BoundedBlockingSubpartition` is merged. - Whether to check file exist on producer side and throw `PartitionNotFoundException` there? I think the check could only be done on producer side during opening the file. Maybe there are other options but I have not thought of now. But the producer might not throw `PartitionNotFoundException` after checking file nonexist. Another option is the producer throws another special exception called `PartitionOpenException`, then it would be sent to consumer via `ErrorResponse` in network. The consumer would fail directly and wrap it into `PartitionNotFoundException` for JM. To do so we could avoid the process of checking partition state on consumer side when receiving `PartitionNotFoundException`. But we need another new exception defination. My current way in PR is easy and actually for blocking partition the consumer could also fail directly after receiving `PartitionNotFoundException` as I mentioned before. I could create a ticket jira for the connection issue later. The unit tests I have not focused yet, because it is related to the process we want to implement. After you approval the current way, I would fix the existing tests and add new tests to cover the cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490377541 Yes, I think we are on the same page now. I would focus on `b, c` in this PR, and launch separate PRs for the other cases future. Especially for `DataConnectionException` we might add the retry mechanism if possible during connecting the server and could throw `PartitionNotFoundException` or `DataConnectionException` after retry fails. I already updated the codes for the following points: - Extend the `PartitionNotFoundException` to add internal `Throwable` information. This exception might be caused by some `IOException` during opening blocking result partition file, so the internal throwable could be used for tracing /debugging the root problem. - Not transform the received `PartitionNotFoundException` to `IOException` on consumer side. - Catch parent `IOException` instead of `PartitionNotFoundException` in `LocalInputChannel` or `PartitionRequestServerHandler` during requesting partition. `IOException` could only happen in blocking result partition during opening disk file which might indicate the file corrupted/deleted. So the above cases of `b, c` are unified in this process. But there might exist concerns here: 1. For the pipelined mode, the consumer might ask JM for partition state to retrigger request after receiving `PartitionNotFoundException`. 2. For the blocking mode, the consumer should fail directly because the partition state is already known as `FINISHED`. So I think we should adjust the logic of `triggerPartitionProducerStateCheck` during receiving `PartitionNotFoundException` on consumer side. Otherwise this PR change might bring unnecessary retry if it is caused by partition corrupted not released. After you confirm this way is correct, then I would add new unit tests for covering these cases. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518 Yes, I think we are on the same page now. I would focus on `b, c` in this PR, and launch separate PRs for the other cases future. Especially for `DataConnectionException` we might add the retry mechanism if possible during connecting the server and could throw `PartitionNotFoundException` or `DataConnectionException` after retry fails. I already updated the codes in two aspects: - Not transform the received `PartitionNotFoundException` on consumer side. - Send the `PartitionNotFoundException` on producer side in the process of creating reader view. If the `ResultPartition` is not removed from `ResultPartitionManager`, only the `SpillableSubpartition#createView` might throw `IOException` which could indicate the required data file not open correctly. So I wrap the `PartitionNotFoundException` in upper layer which seems unified for all the `ResultSubpartition` instances besides the new `BoundedBlockingSubpartition` Stephan proposed. After you confirm this way is correct, then I would add new unit tests for covering these cases. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255 Yes, the blacklist could solve the hardware problems future. As for d, you are right that the `IOException` might be caused by multiple factors, then we do not cover it in this PR. As for f, if I understand correctly, the scenario is similar as the case `a` I mentioned above. If the TM is unreachable, the first phase of establishing connection would fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it might be caused by network temporary problem. After retrying the connection later it might be success. In other words, from the `ConnectionTimeoutException` or `ConnectionRefusedException` received by consumer, it could not estimate whether the producer TM is really lost or the temporary network issue. After the connection established, the second phase is to send logic `PartitionRequest` message, if at this time the producer TM is unreachable to cause any network exceptions, we still can not distinguish whether it is caused by TM really lost or temporary network issue, only if the consumer receives specific `PartitionNotFoundException` via network which is the cases of b, c. So we might only focus on b, c currently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309 @tillrohrmann thanks for the further suggestions! I agree with your overall ideas. ->What kind of environment/hardware issues do you have in mind that could cause repeated failures of reading the result data? I mean the disk/network hardware problems on producer side which could not be restored in short time. So it is better to restart the producer in another machine. We ever encountered this corner case in production. -> Did I understand you correctly, that the PartitionNotFoundException is good enough and, thus, we don't need to introduce a new exception? The information in current `PartitionNotFoundException` is enough for `JobMaster` restarting the producer, but it can not cover all the cases. So I would like to list all the possible cases to confirm with you firstly: - a. Tcp connection fail: it might be caused by producer TM lost or network hardware issue. We might introduce `DataConnectionException` for this. - b. PartitionNotFound: `ResultPartition` is released from `ResultPartitionManager` which needs to restart producer immediately. - c. `ResultSupartition#createReaderView` throw `IOException: it might be caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could also be wrapped into existing `PartitionNotFound`. - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the reason is the same as above c. `PartitionNotFound` might also be used here. - e. Network server exception during transferring data: it seems more complicated here. The reason might be caused by producer TM lost, or temporary network problem or server hardware environment issue, etc. The consumer as client might be sensitive via inactive network channel or `ErrorResponse` from server. We could introduce `DataTransferException` for covering all these. The above {b, c, d} might be determined to restart producer immediately and the current `PartitionNotFound` could be used for covering them. For the cases of a and e, we might introduce new exceptions to cover them and failover strategy might have different rules for considering them. So I think there are two options for considering: - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to define an abstract `DataConsumptionException` as parent of above `PartitionNotFoundException`, `DataConnectionException` and `DataTransferException`. - Or we only concern on {b, c, d} in the first step (a and e might be done future or in other ways), then the current `PartitionNotFound` is enough and no need new exceptions ATM. Both two options make sense for me, so I would like to take your final opinion or you have other options. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-485687952 @flinkbot attention @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-485687507 Some thoughts in implementation: 1. The `DataConsumptionException` should be general for both streaming and batch jobs, although we always restart the whole in pipelined mode atm no matter with this `DataConsumptionException`. 2. We decide to wrap this exception on receiver side, so the sender is not aware of it. The advantages are that it could cover all the cases and reflect the semantics from the aspect of consumer side. This exception is not only involved in the process of `ResultSubpartitionView#getNextBuffer` on sender side, but also involved in the processes of network connection, partition request and data transport. 3. This exception is just a hint for `JobMaster` to decide whether to restart the upstream executions. In future `JobMaster` could also query the partition state from `ShuffleMaster`, then it could consider these two overall factors to make the final decision. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services