[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-08 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723704072


   @tillrohrmann Sorry for breaking the master. And thanks for the quick fix. I 
should have run the CI tests again after rebasing on the master even if there 
is no conflicts...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-08 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723590220


   Merged to master.
   80c040fcf3118a52053ffcd00ca4e063e6f52c97



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-08 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723551306


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723516469


   @flinkbot run azure



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-07 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723460867


   @StephanEwen I think the complexity mostly comes from the consecutive 
`resetToCheckpoint()` calls.
   
   Imagine the following sequence.
   
   1. The RecreateOnResetOperatorCoordinator has a initial internal coordinator 
`coordinator_1`.
   2. `resetToCheckpoint()` is invoked, a new thread `ClosingThread_1` is 
created to:
a. Close `coordinator_1`.
b. create a new internal coordinator `coordinator_2` and reset the 
states.
c. process the pending calls on `coordinator_2` to catch up.
   3. before `ClosingThread_1` finishes its work, `resetToCheckpoint()` is 
invoked again. So `ClosingThread_2` is created to
a. Close `coordinator_2`.
b. create a new internal coordinator `coordinator_3` and reset the 
states.
c. process the pending calls on `coordinator_3` to catch up.
   
   In this case, `coordinator_2` is going to be touched by both 
`ClosingThread_1` and `ClosingThread_2`. And these two threads might race on 
the instance variable `coordinator`. So some synchronization needs to be taken 
care of between multiple closing threads. The epoch was created for this 
purpose, but I agree it is a little confusing.
   
   I think option (b) is a good suggestion. I extended it a bit more to
   1. Created a `DeferrableCoordinator` which contains both the quiesceable 
context and the internal coordinator.
   2. Unified the coordinator to make the instance variable `coordinator` also 
a `DeferrableCoordinator`.
   
   With this change, the threading model becomes:
   1. The scheduler thread is the only thread writing to instance variable 
`coordinator`. And it only interacts with that instance variable.
   2. The old internal coordinator instances are completely isolated from the 
current instance. So the closing thread just need to close them fast.
   
   I have updated the patch with the above changes. Also, the updated patch 
added the implementation of `notifyCheckpointComplete()`. Please let me know if 
you have any concerns. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-11-06 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-723347149


   @StephanEwen Thanks for the review and suggestions. I agree the current 
logic is a little complicated. I am slightly in favor of the approach (b) 
because it is a more self-contained approach. I'll update the patch today.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-10-28 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-717837286


   @StephanEwen I am trying to address the blocking close() method issue. 
However, it is a little more complicated than I thought. I want to align some 
design principle with you and see what might be the solution there. 
   
   From what I understand the current design principle for the 
`SplitEnumerator` is that all the exception handling are synchronous, i.e. the 
implementation should throw exception when a method is invoked. If no exception 
was thrown, the method invocation is considered successful.
   
   In this case, if we want to allow the asynchronous handling and exception 
propagation in `SplitEnumerator`, such as asynchronous close(), we will need to 
have a `failJob()` method in the `SplitEnumeratorContext`. So users can call 
`failJob()` asynchronously instead of throw exceptions when closing in a 
non-blocking manner. The only caveat to this solution is that now users can 
either throw exception or call `failJob()` when the methods are invoked and 
people may wonder what is the difference.
   
   To avoid the above caveat, I was thinking that we can add the async close to 
the `SourceCoordinator` so we don't have to add `failJob()` method to the 
`SplitEnumerator`. However, in practice, sometimes the previous instance of 
`SplitEnumerator` must be successfully closed before the next instance can be 
created. Otherwise there might be conflicts. Therefore, naively having 
non-blocking closing in the `SourceCoordinator` won't work in all cases.
   
   Given all the above thoughts, I am falling back to the solution of adding a 
`failJob()` method to the `SplitEnumeratorContext` so the `SplitEnumerator` 
implementations can decide by themselves what to do in each method. And any 
exception thrown from the method invocation will just result in the job failure.
   
   Do you have any suggestion?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-10-27 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-717266246


   Hi @StephanEwen , thanks for the comments.
   
   Re: the Scala versioning in the test-jar of `flink-connector-base`.
   Good catch! Having a separate module sounds a good approach.
   
   In terms of the close() method in `SplitEnumerator`, personally I prefer the 
generic `close()` helper as it provides a stronger guarantee on the component 
closure. I'll update the patch to add a `closeTimeoutHandler` to the 
`ComponentClosingUtils`. It can be used to fail the job after timeout.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #13574: [FLINK-18323] Add a Kafka source implementation based on FLIP-27.

2020-10-09 Thread GitBox


becketqin commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-706297553


   @StephanEwen @pnowojski @aljoscha Would you have time to take a look at the 
Kafka Source patch? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org