[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-630059574 The PR is reverted because of conflicts with [FLINK-17659] Rework WatermarkStrategy, add Suppliers for TimestampAssigner/WatermarkGenerator committed last night. File the new PR: https://github.com/apache/flink/pull/12219 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-630025655 It is because of commit of unifying the watermark Strategy committed last night... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629949773 BTW, I have updated the javaDocs, and pushed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629925937 Thank you both so much! This is my very first feature and it means a lot to me. I am especially impressed by the level of code integrity @AHeise insists. The code quality is significantly improved! Really appreciate, and thank you again! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629777555 Javadocs for public API has been updated as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629650067 Hey @AHeise , thank you so much again! I believe I've addressed most of the minor issues mentioned in the previous reviews. Please remind me if I miss anything but for the one "extracted this inside of the for loop". I've left an offline msg to you to explain why I think it is better not to do it right now. FLINK-17307 changes the KafkaFetcher to emit the output in batch (once collector per partition), but mine is using once per record (emitting record and watermark in a different way). You can see they have logical differences. It might worth changing to batch emit as well, but might be two collectors, one for watermarks and the other one for records. I am not saying the change is difficult, but I am learning to be conservative right now and I've already listed it as a follow-up: https://issues.apache.org/jira/browse/FLINK-15670. What do you think? I am going to work on the Java docs for the public API now. If you have any comments on the code, please let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629595539 Resolved 3. the last one. remove getTypeSerializer() from TypeInformationSerializationSchema This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629578740 Resolved 2. Avoid duplicated code 1. `KafkaShuffleFetcher` now extends `KafkaFetcher` instead of `AbstractFetcher` 2. I also did a little clean up on the code of `KafkaFetcher`. I know it is not quite relevant, but I guess you will think it is necessary as well. 3. The current "ending of stream" logic in KafkaFetcher a bit strange: if any partition has a record signaled as "END_OF_STREAM", the fetcher will stop running. Notice that the signal is coming from the deserializer, which means from Kafka data itself. But it is possible that other topics and partitions still have data to read. Finishing reading Partition0 can not guarantee that Partition1 also finishes. It is possible I misunderstand what an "END_OF_STREAM" signal means. 4. All Kafka related tests passed after the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629364820 Resolved 1. Ser/Der Format Change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629305711 As discussed offline, three things remaining to do: 1. Ser/Der Format Change 2. avoid duplicated code 3. Use TypeSerializer instead of TypeInformationSerializationSchema From the priority wise, I think 1 > 2 > 3. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629178179 > Test code already looks good (expecting the watermark tests to be implemented). > > Production code needs polishing: > > * API Javadoc should really help non-experts. > * Serializers should probably more use the existing abstractions like `KafkaDeserializationSchema` and not change/exhibit `TypeInformationSerializationSchema` This is difficult as what KafkaDeserializationSchema is providing T deserialize(ConsumerRecord record) throws Exception; and what I need, a TypeSerialer that can deserialize type by type. If I am asking for a KafkaDeserializationSchema, that's basically asking the user to write the deserializer themselves. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629172289 Add watermark randomness test and incremental test from the consumer side. I think by far both the value and watermark part are checked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-629074571 address comments and did another pass on java docs and formats. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-628723932 @AHeise I think I've addressed most of the comments, and all the tests are stable now. I also did one pass to address the java docs and indent problem. Missing java docs generates a check-style error, so it is relatively easy to check. For the indent problem, I've tried my best. Hope I did not miss places. I think I am still missing one "must-do", randomize watermarks. I will think of one tonight and add one tomorrow. It should be fairly easy (well compared to the failover IT tests). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-628541322 Stable ITTests, finally... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-628358245 Fix the problem Failover tests: 1. For two-phase commit, if one of the subtasks is FINISHEDthen the whole graph aborts because checkpoint can not complete. The solution is using unbounded sources instead. 2. Add states (snapshot/restore) for validation checks when enabling checkpoints. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-625295248 Thanks so much @AHeise ! The watermark (from Producer side) is almost done, need to do some clean-up though. Will update the PR and also address the comments once the remaining IT tests are done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-622771835 @AHeise and @pnowojski , I have updated the PR with respect to the API change suggested above. The API change is listed as the fourth commit "[FLINK-15670][connector] Kafka Shuffle API Part". The updated PR also includes amends suggested by Arvid in previous reviews. I left a reply for comments not yet resolved. Now, most of the code change is wrapped in the package "org.apache.flink.streaming.connectors.kafka.shuffle" to avoid effects on other parts. 1. I think now the only thing a bit struggling is "TwoPhaseCommitFunction". If I do not provide a watermark entry in that abstract function, I will end up exposing "currentTransactionHolder" as well as "TransactionHolder#handle". Your call. 2. I divide the "persistentKeyBy" to two functions: "writeKeyBy" and "readKeyBy". Hence people can call "readKeyBy" directly to reuse the written data. You can take a look at the API to see whether the changes make sense. Thanks in advance!! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-622262988 Hey, @AHeise and @pnowojski. Thanks so much for your time and efforts, really appreciate! I think this is a fair and practical workaround. My first intention was that "operator" is put with the API module, and function implementation is put with the Kafka connector module. But now since the operator is tailored specifically for new KafkaShuffleFunction, then it is no need to put the custom operator in the API module anymore. If I wrap everything in the `flink-connector-kafka`, I agree there should not be any cyclic dependency issue. I will do the updates, and thank you both again! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-621721146 Updates to commit "Kafka Shuffle and Test Case" 1. add log4j2 file for better testing 2. I found the original tests are occasionally failing when testing for a large number of data. The reason is the default property "auto.offset.reset" is set to "latest". That means if the consumer starts later than the producer starting to writing data, then I am going to lose some data occasionally. So, I switched to use KafkaTestEnvironment.getStandardProperties which set the auto.offset.reset = earliest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591 # 1. Change to `SinkFunction`. By "coordination", do you mean that other changes are also made upon `SinkFunction` and may need coordination? I've expected reviewers to have strong reactions to the API change, that's fine. But I am a bit confused about what is agreed/disagreed and what is a suggested better way, so let me try to clarify some of my thoughts and reason about why the API is changed in this way. As suggested by Stephan, In the PR, I do have a custom operator `StreamShuffleSink` and a custom transformation in `SinkTransformation` for the custom operator. As Arvid mentioned in the previous reviews, there are a lot of code duplications between `StreamShuffleSink` and `StreamSink`. - That's true because they are very similar LOL, but we do want to provide a different operator to minimize the impact of changes on existing operators. - Personally, I do not prefer to have multi-levels of extends/subclasses, especially if the superclass is not abstract. Multi-level extensions make code very difficult to read. You can not easily track what functions/members a class contains in a straightforward way, especially without a good IDE. - Come back to the duplication. There are in total 100 lines of code, with very simple logic. So personally I would prefer to trade these `100` lines of code for `readability`. `SinkFunction` as its name, is the function invoked in the sink operator to provide a invoke function to handle record. `FlinkKafkaProducer` itself is a TwoPhaseCommitSinkFunction which implements `SinkFunction`. If we really want to avoid changing `SinkFunction`, I can have a new interface and have the current TwoPhaseCommitSinkFunction implements the new interface. It should be safer than the current way, and also avoids conflicts if that's the concern. Please let me know what do you think of this proposal. # 2. `StreamElementSerializer`; I can not simply use `StreamElementSerializer` because the way watermark is stored/handled is different. In short, if multiple sink subtasks write to the same partition (sink), we need a way to decide the watermark in the source (downstream operator from the shuffle perspective). In the current netty shuffle service, we keep N channels and watermarks in each channel; while in this case, data and watermarks have been merged when writing to partitions. Please refer to Jira FLINK-15670 for discussion about watermark: https://issues.apache.org/jira/browse/FLINK-15670 You can start from [here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232) It includes my original thoughts, proposals and Stephan's enhanced version. # 3. "Checkpoints" and "Savepoints" As far as I know, savepoints are very similar to checkpoints except that savepoints are more or less user-faced. That says user can trigger a replay based on save points. I guess I can kind of understanding why you are saying "restoring from an old savepoint would completely screw up the data". It is true if you think of this problem from a global snapshotting and global failover perspective. However, let's step back and think of why we we want to have the persistent shuffle in the first place. If data is persisted, you do not really need to replay the calculation again. Persistency is to unleash the constraints between upstream and downstream. For your concern, we do not need to do a global replay as well. We can simply do a regional replay. If there is any constraints in implementation, we can disable it for now. In the long term, I do not see it is a problem. But, Maybe I misunderstand you :-) Sorry to write this much again. These are all good questions that I think is more clear to answer in a systematic way :-). Let's chat in details this afternoon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle
curcur commented on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-618879789 > So I had an offline discussion with Stephan to clarify the scope. Indeed, `KafkaShuffle` has been requested by users and serves as a bridge until we get fully persistent channels. > > We both agree that it would have been nice to also support reading from non-Flink shuffles (a.k.a from any partitioned Kafka topic) by making the serializer pluggable at the composition level. Please have a look at `StreamElementSerializer` and see if we can use it. If that doesn't work for some reason, then I can live with a pure `KafkaShuffle` in the first iteration. > > Implementation-wise, we are both a bit skeptical that an API change (to `SinkFunction`) is the best course as that requires more coordination and should have probably been triggered already if you want this feature in 1.11. Using custom operators would give you all freedom without that the need of coordiation. It would also avoid the changes to `KafkaProducer`/`KafkaConsumer` on the cost of replicating some logic. > > Lastly, I have strong headaches on how checkpoints and savepoints are working with `KafkaShuffle`. I think for storing checkpoints and recovery in terms of fault tolerance, the approach is good as-is. However, for savepoints, we should probably ensure that no unconsumed data is still in lingering in the shuffle topic as that would translate to in-flight data. Hence, restoring from an old savepoint would completely screw up the data. At this point, we also need to ensure that the topic is purged (probably with some assertion). Not supporting going back in checkpoints should be save from current guarantees. Alternatively, we also need to implement some recovery logic for older check/savepoints that ignores "future" data somehow (so some higher level Kafka offset management). Hey Arvid, thanks so much for the quick response! I think you have several concerns about 1. why `StreamElementSerializer` can not be reused 2. why I have to have a different `KafkaProducer/KafkaConsumer` 3. Have a better way instead of changing `SinkFunction` (that's exactly my concern, and why I want to get early feedback, I am hesitating as well) 4. `save points`, which I do not completely get it. For the first two, I have reasons; for the third one, I have concerns as well. For the forth, not completely sure I understand it correctly. Do, do you have time to chat a bit on these four points on Monday? BTW, I am not insisting to get this in 1.11. Instead, I really want to do it in the right way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org