[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723704072 @tillrohrmann Sorry for breaking the master. And thanks for the quick fix. I should have run the CI tests again after rebasing on the master even if there is no conflicts... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723590220 Merged to master. 80c040fcf3118a52053ffcd00ca4e063e6f52c97 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723551306 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723516469 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723460867 @StephanEwen I think the complexity mostly comes from the consecutive `resetToCheckpoint()` calls. Imagine the following sequence. 1. The RecreateOnResetOperatorCoordinator has a initial internal coordinator `coordinator_1`. 2. `resetToCheckpoint()` is invoked, a new thread `ClosingThread_1` is created to: a. Close `coordinator_1`. b. create a new internal coordinator `coordinator_2` and reset the states. c. process the pending calls on `coordinator_2` to catch up. 3. before `ClosingThread_1` finishes its work, `resetToCheckpoint()` is invoked again. So `ClosingThread_2` is created to a. Close `coordinator_2`. b. create a new internal coordinator `coordinator_3` and reset the states. c. process the pending calls on `coordinator_3` to catch up. In this case, `coordinator_2` is going to be touched by both `ClosingThread_1` and `ClosingThread_2`. And these two threads might race on the instance variable `coordinator`. So some synchronization needs to be taken care of between multiple closing threads. The epoch was created for this purpose, but I agree it is a little confusing. I think option (b) is a good suggestion. I extended it a bit more to 1. Created a `DeferrableCoordinator` which contains both the quiesceable context and the internal coordinator. 2. Unified the coordinator to make the instance variable `coordinator` also a `DeferrableCoordinator`. With this change, the threading model becomes: 1. The scheduler thread is the only thread writing to instance variable `coordinator`. And it only interacts with that instance variable. 2. The old internal coordinator instances are completely isolated from the current instance. So the closing thread just need to close them fast. I have updated the patch with the above changes. Also, the updated patch added the implementation of `notifyCheckpointComplete()`. Please let me know if you have any concerns. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-723347149 @StephanEwen Thanks for the review and suggestions. I agree the current logic is a little complicated. I am slightly in favor of the approach (b) because it is a more self-contained approach. I'll update the patch today. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-717837286 @StephanEwen I am trying to address the blocking close() method issue. However, it is a little more complicated than I thought. I want to align some design principle with you and see what might be the solution there. From what I understand the current design principle for the `SplitEnumerator` is that all the exception handling are synchronous, i.e. the implementation should throw exception when a method is invoked. If no exception was thrown, the method invocation is considered successful. In this case, if we want to allow the asynchronous handling and exception propagation in `SplitEnumerator`, such as asynchronous close(), we will need to have a `failJob()` method in the `SplitEnumeratorContext`. So users can call `failJob()` asynchronously instead of throw exceptions when closing in a non-blocking manner. The only caveat to this solution is that now users can either throw exception or call `failJob()` when the methods are invoked and people may wonder what is the difference. To avoid the above caveat, I was thinking that we can add the async close to the `SourceCoordinator` so we don't have to add `failJob()` method to the `SplitEnumerator`. However, in practice, sometimes the previous instance of `SplitEnumerator` must be successfully closed before the next instance can be created. Otherwise there might be conflicts. Therefore, naively having non-blocking closing in the `SourceCoordinator` won't work in all cases. Given all the above thoughts, I am falling back to the solution of adding a `failJob()` method to the `SplitEnumeratorContext` so the `SplitEnumerator` implementations can decide by themselves what to do in each method. And any exception thrown from the method invocation will just result in the job failure. Do you have any suggestion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-717266246 Hi @StephanEwen , thanks for the comments. Re: the Scala versioning in the test-jar of `flink-connector-base`. Good catch! Having a separate module sounds a good approach. In terms of the close() method in `SplitEnumerator`, personally I prefer the generic `close()` helper as it provides a stronger guarantee on the component closure. I'll update the patch to add a `closeTimeoutHandler` to the `ComponentClosingUtils`. It can be used to fail the job after timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.
becketqin commented on pull request #13574: URL: https://github.com/apache/flink/pull/13574#issuecomment-706297553 @StephanEwen @pnowojski @aljoscha Would you have time to take a look at the Kafka Source patch? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org