[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Thanks :) ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 Merged! ð Could you please close this PR? ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 @aljoscha rebased on latest master and integrated your changes ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user ariskk commented on the issue: https://github.com/apache/flink/pull/4239 We are really looking forward to this ð ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Bugs in tests (those that you can see in fixup commits) ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 What were the bugs that you fixed? ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 @aljoscha I have addressed you "high level" comments and fixed some bugs. Please latest 5 commits (one of them is a new dependency on another PR: https://github.com/apache/flink/pull/4631 ) ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4239 I did a first high-level review of the code. I think it's good so far! Before we can merge this, however, we need a few more things around it: - A section in the Kafka doc about the new exactly-once mode, how it can be configured etc. - A big disclaimer (possibly in an "alert" box) about the interplay with the transaction timeout and what the caveats there are - A section in the Javadocs about the aforementioned caveats - A check that ensures that the transaction timeout is set to a reasonably high setting (say 1 hour) when exactly-once semantics are enabled. (With an override setting that allows the user to set a lower transaction time out if they want to.) Also, this has interplay with #4616 but we can resolve that by merging them in any order and fixing up the later changes when merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Implemented fixed size pool of producers, please check last commit. If we run out of producers in the pool, exception is being thrown aborting ongoing snapshot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user rangadi commented on the issue: https://github.com/apache/flink/pull/4239 Yep, that makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 This solution (basically a pool with a fixed size of 2) would work, only if there would be at most one pending commit transaction. Which is not always true in Flink - there can be multiple triggered checkpoints pending completion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user rangadi commented on the issue: https://github.com/apache/flink/pull/4239 I guess you could store the transactional.id for _next_ transaction in committed state. That way the new task starts the new transaction with the name stored in state which automatically aborts the open transaction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Indeed it seems like you are right. `read_committed` doesn't play along with long `max.transaction.timeout.ms`. I'm not sure about Beam, but in Flink we can not use one single `transactional.id`, because our checkpoints are asynchronous - `notifyCheckpointComplete` (which triggers `KafkaProducer#commit`) can come long after `preCommit`. In that time we can not use the same `transactional.id` for new transactions. We can walk around this issue by implementing a pool of `transactional.id`s, which we can save on the state. This will allows on restoring state to not only `recoverAndCommit` all pending transactions, but to abort all other unknown "lingering" transactions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user rangadi commented on the issue: https://github.com/apache/flink/pull/4239 > Hmmm, are you sure about this thing? That would mean that Kafka doesn't support transactional parallel writes from two different process, which would be very strange. Could you point to a source of this information? It does not prohibit parallel transactions. Just restricts what an EOS consumer, which reads only the committed messages can see. See 'Reading Transactional Messages' section in JavaDoc for KafkaConsumer : https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L421 : > In read_committed mode, the consumer will read only those transactional messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in read_committed mode. Instead, the end offset of a partition for a read_committed consumer would be the offset of the first message in the partition belonging to an open transaction. This offset is known as the 'Last Stable Offset'(LSO). If there is an open transaction, the EOS consumers don't read past it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Writing records in state would be very costly. It is only a "last resort" solution. > That would imply exactly-once consumers can not read past that transaction as long as it is open Hmmm, are you sure about this thing? That would mean that Kafka doesn't support transactional parallel writes from two different process, which would be very strange. Could you point to a source of this information? Resuming transactions is not a part of `KafkaProducer`'s API, however Kafka's REST API allows to do that. However I'm aware that it wasn't an intention of the authors to do so. Kafka Streams do not need to do that, because they achieve exactly-once semantic by using persistent communication channels (Kafka topics), so they can easily restart each operator on it's own by replay/rewinding every input channel (Kafka topic). This comes with a cost, because it makes communication between operators extremely, since every message must goes to HDDs at some point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user rangadi commented on the issue: https://github.com/apache/flink/pull/4239 May be an extra shuffle to make small batches could help. Another option is to buffer all the records in state and write them all inside commit(). But not sure how costly it is to save all the records in checkpointed state. Another issue I see with using random txn id : if a worker looks unresponsive and work is moved to another worker, it is possible that the old worker still lingers around with open transaction. That would imply it the exactly-once consumers can not read past that transaction as long as it is open. I didn't know it was possible to resume a transaction since it was not part of producer API. This PR uses an undocumented way to do it.. do you know if Kafka Streams also does something like that? May be the producer will support `resumeTransaction()` properly in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 I think there is no way we can to handle it in any different way then to increase the timeout to some very large value. Or is it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user rangadi commented on the issue: https://github.com/apache/flink/pull/4239 How does exactly-once sink handle large gap between `preCommit()` and `recoverAndCommit()` in case of a recovery? The server seems to abort a transaction after a timeout `max.transaction.timeout.ms`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4239 Now that the prerequisite PRs are merged, we can rebase this now :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4239 please add an entry to the `MODULES_CONNECTORS' variable in the `tools/travis_mvn_watchdog` sh script. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4239 Ok :) I can agree that we keep 321a142 a separate commit. For df6d5e0 to 5ff8106, I actually found it easier to ignore all that (because a lot of it is irrelevant in the end) and went straight to 41ad973. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 `df6d5e0` to `5ff8106` should definitely be squashed, I left them only to make it easier for reviewers to follow the changes made in 0.11 vs 0.10 connectors (those changes would be invisible in one blob commit). For `321a142` to `2cf5f3b` I'm not sure about the first one, `FlinkKafkaProducer` is that hacky that it could deserve separate commit. It would make it stand out more if anyone in the future would look at the commit history/changes (it could hide in larger change). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4239 One other comment regarding the commits: I would argue that df6d5e0 to 5ff8106 should not appear in the commit log history, since in the end we actually have a completely new producer for 011 anyways. Also, 321a142 to 2cf5f3b should be squashed to a single commit for the addition of an "exactly-once producer for 011" (the new `FlinkKafkaProducer` implementation and exactly-once tests shouldn't stand alone as independent commits, IMO. `FlinkKafkaProducer` isn't used by other producer version, and the exactly-once producer addition wouldn't be valid without the tests). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4239 Regarding how I would proceed with this big contribution: Lets first try to clean up the commits that are bundled all together here. 1. I would first try to merge #4321 (the first 4 commits) and #4310 (af7ed19) and get those out of the way. 2. For a06cb94 (`TwoPhaseCommitSinkFunction`), could you open a separate PR with unit tests covered? 3. After the above is all sorted out, we rebase this again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4239 Thanks a lot for opening a pull request for this very important feature @pnowojski. I did a rough first pass and had some comments I would like to clear out first (this is a big chunk of code, we would probably need to go through this quite a few times before it can be mergeable.) Most notably, some comments so far: 1. I think we need UTs for the `TwoPhaseCommitSinkFunction`. It alone is a very important addition (I would even prefer a separate PR for it and try to merge that first.) 2. Serialization of the transaction state in `TwoPhaseCommitSinkFunction` needs to be changed 2. Is the `FlinkKafkaProducer011` actually supporting hybrid (normal sink function and `writeToKafkaWithTimestamps` as a custom sink operator)? From the looks of it, it doesn't seem like it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---