[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-18 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-630059574


   The PR is reverted because of conflicts with
   [FLINK-17659] Rework WatermarkStrategy, add Suppliers for 
TimestampAssigner/WatermarkGenerator
   
   committed last night.
   
   File the new PR: https://github.com/apache/flink/pull/12219



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-18 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-630025655


   It is because of commit of unifying the watermark Strategy committed last 
night...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-17 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629949773


   BTW, I have updated the javaDocs, and pushed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-17 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629925937


   Thank you both so much! This is my very first feature and it means a lot to 
me. I am especially impressed by the level of code integrity @AHeise insists. 
The code quality is significantly improved! Really appreciate, and thank you 
again!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-17 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629777555


   Javadocs for public API has been updated as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629650067


   Hey @AHeise , thank you so much again!
   
   I believe I've addressed most of the minor issues mentioned in the previous 
reviews. Please remind me if I miss anything but for the one "extracted this 
inside of the for loop". I've left an offline msg to you to explain why I think 
it is better not to do it right now.
   
   FLINK-17307 changes the KafkaFetcher to emit the output in batch (once 
collector per partition), but mine is using once per record (emitting record 
and watermark in a different way).
   
   You can see they have logical differences. It might worth changing to batch 
emit as well, but might be two collectors, one for watermarks and the other one 
for records.
   
   I am not saying the change is difficult, but I am learning to be 
conservative right now and I've already listed it as a follow-up: 
https://issues.apache.org/jira/browse/FLINK-15670.
   
   What do you think?
   
   I am going to work on the Java docs for the public API now. If you have any 
comments on the code, please let me know. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-16 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629595539


   Resolved 3. the last one.
   
   remove getTypeSerializer() from TypeInformationSerializationSchema



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629578740


   Resolved 2. Avoid duplicated code
   
   1. `KafkaShuffleFetcher` now extends `KafkaFetcher` instead of 
`AbstractFetcher`
   2. I also did a little clean up on the code of `KafkaFetcher`. I know it is 
not quite relevant, but I guess you will think it is necessary as well.
   3. The current "ending of stream" logic in KafkaFetcher a bit strange: if 
any partition has a record signaled as "END_OF_STREAM", the fetcher will stop 
running. Notice that the signal is coming from the deserializer, which means 
from Kafka data itself. But it is possible that other topics and partitions 
still have data to read. Finishing reading Partition0 can not guarantee that 
Partition1 also finishes. It is possible I misunderstand what an 
"END_OF_STREAM" signal means. 
   4. All Kafka related tests passed after the change.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629364820


   Resolved 1. Ser/Der Format Change



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629305711


   As discussed offline, three things remaining to do:
   
   1. Ser/Der Format Change
   2. avoid duplicated code
   3. Use TypeSerializer instead of TypeInformationSerializationSchema
   
   From the priority wise, I think 1 > 2 > 3.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629178179


   > Test code already looks good (expecting the watermark tests to be 
implemented).
   > 
   > Production code needs polishing:
   > 
   > * API Javadoc should really help non-experts.
   > * Serializers should probably more use the existing abstractions like 
`KafkaDeserializationSchema` and not change/exhibit 
`TypeInformationSerializationSchema`
   
   This is difficult as what KafkaDeserializationSchema is providing
   
   T deserialize(ConsumerRecord record) throws Exception;
   
   and what I need, a TypeSerialer that can deserialize type by type.
   
   If I am asking for a KafkaDeserializationSchema, that's basically asking the 
user to write the deserializer themselves.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629172289


   Add watermark randomness test and incremental test from the consumer side.
   
   I think by far both the value and watermark part are checked.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-15 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-629074571


   address comments and did another pass on java docs and formats.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-628723932


   @AHeise 
   I think I've addressed most of the comments, and all the tests are stable 
now.
   
   I also did one pass to address the java docs and indent problem.
   
   Missing java docs generates a check-style error, so it is relatively easy to 
check.  For the indent problem, I've tried my best. Hope I did not miss places.
   
   I think I am still missing one "must-do", randomize watermarks.
   
   I will think of one tonight and add one tomorrow. It should be fairly easy 
(well compared to the failover IT tests).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-14 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-628541322


   Stable ITTests, finally...  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-13 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-628358245


   Fix the problem Failover tests:
   1. For two-phase commit, if one of the subtasks is FINISHEDthen the whole 
graph aborts because checkpoint can not complete. The solution is using 
unbounded sources instead.
   
   2. Add states (snapshot/restore) for validation checks when enabling 
checkpoints.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-07 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-625295248


   Thanks so much @AHeise !
   
   The watermark (from Producer side) is almost done, need to do some clean-up 
though.
   
   Will update the PR and also address the comments once the remaining IT tests 
are done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-02 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-622771835


   @AHeise and @pnowojski , I have updated the PR with respect to the API 
change suggested above. The API change is listed as the fourth commit 
"[FLINK-15670][connector] Kafka Shuffle API Part". The updated PR also includes 
amends suggested by Arvid in previous reviews. I left a reply for comments not 
yet resolved.  
   
   Now, most of the code change is wrapped in the package 
"org.apache.flink.streaming.connectors.kafka.shuffle" to avoid effects on other 
parts.
   
   1. I think now the only thing a bit struggling is "TwoPhaseCommitFunction". 
If I do not provide a watermark entry in that abstract function, I will end up 
exposing "currentTransactionHolder" as well as "TransactionHolder#handle". Your 
call.
   
   2. I divide the "persistentKeyBy" to two functions: "writeKeyBy" and 
"readKeyBy". Hence people can call "readKeyBy" directly to reuse the written 
data. You can take a look at the API to see whether the changes make sense.
   
   Thanks in advance!!
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-05-01 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-622262988


   Hey, @AHeise  and @pnowojski. Thanks so much for your time and efforts, 
really appreciate!
   
   I think this is a fair and practical workaround. My first intention was that 
"operator" is put with the API module, and function implementation is put with 
the Kafka connector module. But now since the operator is tailored specifically 
for new KafkaShuffleFunction, then it is no need to put the custom operator in 
the API module anymore. 
   
   If I wrap everything in the `flink-connector-kafka`, I agree there should 
not be any cyclic dependency issue. 
   
   I will do the updates, and thank you both again!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-30 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-621721146


   Updates to commit "Kafka Shuffle and Test Case"
   
   1. add log4j2 file for better testing
   2. I found the original tests are occasionally failing when testing for a 
large number of data. The reason is the default property "auto.offset.reset" is 
set to "latest". That means if the consumer starts later than the producer 
starting to writing data, then I am going to lose some data occasionally.
   So, I switched to use KafkaTestEnvironment.getStandardProperties which set 
the auto.offset.reset = earliest
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   # 1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
   # 2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
   # 3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in details this 
afternoon. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-24 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-618879789


   > So I had an offline discussion with Stephan to clarify the scope. Indeed, 
`KafkaShuffle` has been requested by users and serves as a bridge until we get 
fully persistent channels.
   > 
   > We both agree that it would have been nice to also support reading from 
non-Flink shuffles (a.k.a from any partitioned Kafka topic) by making the 
serializer pluggable at the composition level. Please have a look at 
`StreamElementSerializer` and see if we can use it. If that doesn't work for 
some reason, then I can live with a pure `KafkaShuffle` in the first iteration.
   > 
   > Implementation-wise, we are both a bit skeptical that an API change (to 
`SinkFunction`) is the best course as that requires more coordination and 
should have probably been triggered already if you want this feature in 1.11. 
Using custom operators would give you all freedom without that the need of 
coordiation. It would also avoid the changes to `KafkaProducer`/`KafkaConsumer` 
on the cost of replicating some logic.
   > 
   > Lastly, I have strong headaches on how checkpoints and savepoints are 
working with `KafkaShuffle`. I think for storing checkpoints and recovery in 
terms of fault tolerance, the approach is good as-is. However, for savepoints, 
we should probably ensure that no unconsumed data is still in lingering in the 
shuffle topic as that would translate to in-flight data. Hence, restoring from 
an old savepoint would completely screw up the data. At this point, we also 
need to ensure that the topic is purged (probably with some assertion). Not 
supporting going back in checkpoints should be save from current guarantees. 
Alternatively, we also need to implement some recovery logic for older 
check/savepoints that ignores "future" data somehow (so some higher level Kafka 
offset management).
   
   Hey Arvid, thanks so much for the quick response!
   I think you have several concerns about
   
   1. why `StreamElementSerializer` can not be reused
   2. why I have to have a different `KafkaProducer/KafkaConsumer`
   3. Have a better way instead of changing `SinkFunction` (that's exactly my 
concern, and why I want to get early feedback, I am hesitating as well)
   4. `save points`, which I do not completely get it.
   
   For the first two, I have reasons; for the third one, I have concerns as 
well. For the forth, not completely sure I understand it correctly.
   Do, do you have time to chat a bit on these four points on Monday?
   
   BTW, I am not insisting to get this in 1.11. Instead, I really want to do it 
in the right way. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org