[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-13 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-491760194
 
 
   Great, I have no other concerns now, then I could continue on this PR. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-10 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-491258890
 
 
   @tillrohrmann thanks for your further great suggestions!
   
   I totally agree the above points and the new exception category would make 
the semantics more clearly. 
   
   To double confirm If my understanding is correct:
   
   - The new proposed `PartitionException` has three sub-type instances as 
`PartitionNotFoundException`, `PartitionCorruptionException` and 
`PartitionLookupException`. The job master would check the `PartitionException` 
for determining to restart the producer. Especially for instance of 
`PartitionLookupException`, the producer might need to be restarted in another 
machine.
   
   - Considering the current `SpilledSubpartitionView` would be replace by 
Stephan's new `BoundedBlockingSubpartition`, do you think it is necessary to 
throw `PartitionCorruptionException` in current code or focus on it after 
`BoundedBlockingSubpartition` is merged?
   
   - The producer could send `PartitionException` if fails to create sub 
partition view. On consumer side, if the exception is instanceof 
`PartitionNotFoundException`, it would keep the current logic of retrying to 
request partition. If instanceof `PartitionCorruptionException`, the consumer 
could fail directly.
   
   - I already created a separate jira for covering `PartitionLookupException`, 
and the connection lost after established would not be considered in this issue.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-08 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490529494
 
 
   Thanks for the further review @tillrohrmann . Let me try to explain above 
two concerns.
   
   - Whether to always transform `IOException` to `PartitionNotFoundException`? 
Actually I would like to do this wrapper in the construtor of 
`SpilledSubpartitionView` which might cause `IOException` during opening disk 
file. And based on the current codes, this is the only process which might 
cause `IOExeption` during creating `ResultSubpartitionView`. But considering 
the `SpillableSubpartitoin` and its reader view would be removed by stephan's 
new `BoundedBlockingSubpartition` soon, so I do this wrapper as the way in PR 
which might also suitable for the new `BoundedBlockingSubpartition`. But in 
strict way it would   be better to transform in the specific process instead of 
covering the whole process. If you have concerns on this, I could adjust it to 
the internal process even though it would be removed soon. Or we do not focus 
on the case `c` until `BoundedBlockingSubpartition` is merged.
   
   - Whether to check file exist on producer side and throw 
`PartitionNotFoundException` there?  I think the check could only be done on 
producer side during opening the file. Maybe there are other options but I have 
not thought of now. But the producer might not throw 
`PartitionNotFoundException` after checking file nonexist. Another option is 
the producer throws another special exception called `PartitionOpenException`, 
then it would be sent to consumer via `ErrorResponse` in network. The consumer 
would fail directly and wrap it into `PartitionNotFoundException` for JM. To do 
so we could avoid the process of checking partition state on consumer side when 
receiving `PartitionNotFoundException`. But we need another new exception 
defination. My current way in PR is easy and actually for blocking partition 
the consumer could also fail directly after receiving 
`PartitionNotFoundException` as I mentioned before.
   
   I could create a ticket jira for the connection issue later.
   
   The unit tests I have not focused yet, because it is related to the process 
we want to implement. After you approval the current way, I would fix the 
existing tests and add new tests to cover the cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-08 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490377541
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes for the following points:
   
   -  Extend the `PartitionNotFoundException` to add internal `Throwable` 
information. This exception might be caused by some `IOException` during 
opening blocking result partition file, so the internal throwable could be used 
for tracing /debugging the root problem.
   
   - Not transform the received `PartitionNotFoundException` to `IOException` 
on consumer side.
   
   - Catch parent `IOException` instead of `PartitionNotFoundException` in 
`LocalInputChannel` or `PartitionRequestServerHandler` during requesting 
partition. `IOException` could only happen in blocking result partition during 
opening disk file which might indicate the file corrupted/deleted.  So the 
above cases of `b, c` are unified in this process.  But there might exist 
concerns here: 
   
   1.  For the pipelined mode, the consumer might ask JM for partition state to 
retrigger request after receiving `PartitionNotFoundException`. 
   2. For the blocking mode, the consumer should fail directly because the 
partition state is already known as `FINISHED`. 
   
   So I think we should adjust the logic of 
`triggerPartitionProducerStateCheck` during receiving 
`PartitionNotFoundException` on consumer side. Otherwise this PR change might 
bring unnecessary retry if it is caused by partition corrupted not released.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes in two aspects:
   
   -  Not transform the received `PartitionNotFoundException` on consumer side.
   
   - Send the `PartitionNotFoundException` on producer side in the process of 
creating reader view. If the `ResultPartition` is not removed from 
`ResultPartitionManager`, only the `SpillableSubpartition#createView` might 
throw `IOException` which could indicate the required data file not open 
correctly. So I wrap the `PartitionNotFoundException` in upper layer which 
seems unified for all the `ResultSubpartition` instances besides the  new 
`BoundedBlockingSubpartition` Stephan proposed.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490011255
 
 
   Yes, the blacklist could solve the hardware problems future.
   
   As for d, you are right that the `IOException` might be caused by multiple 
factors, then we do not cover it in this PR.
   
   As for f, if I understand correctly, the scenario is similar as the case `a` 
I mentioned above. 
   If the TM is unreachable, the first phase of establishing connection would 
fail by `ConnectionTimeoutException` or `ConnectionRefusedException` etc. I 
proposed `DataConnectionException` not `PartitionNotFound` in `a`, because it 
might be caused by network temporary problem. After retrying the connection 
later it might be success. 
   
   In other words, from the `ConnectionTimeoutException` or 
`ConnectionRefusedException` received by consumer, it could not estimate 
whether the producer TM is really lost or the temporary network issue. 
   
   After the connection established, the second phase is to send logic 
`PartitionRequest` message, if at this time the producer TM is unreachable to 
cause any network exceptions, we still can not distinguish whether it is caused 
by TM really lost or temporary network issue, only if the consumer receives 
specific `PartitionNotFoundException` via network which is the cases of b, c.
   
   So we might only focus on b, c currently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-06 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489901309
 
 
   @tillrohrmann thanks for the further suggestions! I agree with your overall 
ideas.
   
   ->What kind of environment/hardware issues do you have in mind that could 
cause repeated failures of reading the result data? 
   
   I mean the disk/network hardware problems on producer side which could not 
be restored in short time. So it is better to restart the producer in another 
machine. We ever encountered this corner case in production.
   
   -> Did I understand you correctly, that the PartitionNotFoundException is 
good enough and, thus, we don't need to introduce a new exception?
   
   The information in current `PartitionNotFoundException` is enough for 
`JobMaster` restarting the producer, but it can not cover all the cases. So I 
would like to list all the possible cases to confirm with you firstly:
   
   - a. Tcp connection fail: it might be caused by producer TM lost or network 
hardware issue. We might introduce `DataConnectionException` for this.
   
   - b. PartitionNotFound: `ResultPartition` is released from 
`ResultPartitionManager` which needs to restart producer immediately.
   
   - c. `ResultSupartition#createReaderView`  throw `IOException: it might be 
caused by disk file corrupt/deleted for `BlockingResultSubpartition`. It could 
also be wrapped into existing `PartitionNotFound`.
   
   - d. `BlockingResultSubpartitionView#getNextBuffer` thrown IOException: the 
reason is the same as above c. `PartitionNotFound` might also be used here.
   
   - e. Network server exception during transferring data: it seems more 
complicated here. The reason might be caused by producer TM lost, or temporary 
network problem or server hardware environment issue, etc. The consumer as 
client might be sensitive via inactive network channel or `ErrorResponse` from 
server. We could introduce `DataTransferException` for covering all these.
   
   The above {b, c, d} might be determined to restart producer immediately and 
the current `PartitionNotFound` could be used for covering them.
   
   For the cases of a and e, we might introduce new exceptions to cover them 
and failover strategy might have different rules for considering them.
   
   So I think there are two options for considering: 
   
   - If we want to cover all {a, b, c, d, e} ATM, it might be necessary to 
define an abstract `DataConsumptionException` as parent of  above 
`PartitionNotFoundException`, `DataConnectionException` and 
`DataTransferException`. 
   
   - Or we only concern on {b, c, d} in the first step (a and e might be done 
future or in other ways), then the current `PartitionNotFound` is enough and no 
need new exceptions ATM. 
   
   Both two options make sense for me, so I would like to take your final 
opinion or you have other options. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-04-23 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-485687952
 
 
   @flinkbot attention @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-04-23 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-485687507
 
 
   Some thoughts in implementation:
   
   1. The `DataConsumptionException` should be general for both streaming and 
batch jobs, although we always restart the whole in pipelined mode atm no 
matter with this `DataConsumptionException`. 
   
   2. We decide to wrap this exception on receiver side, so the sender is not 
aware of it. The advantages are that it could cover all the cases and reflect 
the semantics from the aspect of consumer side. This exception is not only 
involved in the process of `ResultSubpartitionView#getNextBuffer` on sender 
side, but also involved in the processes of network connection, partition 
request and data transport.
   
   3. This exception is just a hint for `JobMaster` to decide whether to 
restart the upstream executions. In future `JobMaster` could also query the 
partition state from `ShuffleMaster`, then it could consider these two overall 
factors to make the final decision.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services