[
https://issues.apache.org/jira/browse/FLINK-2386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14660438#comment-14660438
]
ASF GitHub Bot commented on FLINK-2386:
---------------------------------------
GitHub user rmetzger opened a pull request:
https://github.com/apache/flink/pull/996
[WIP][FLINK-2386] Add new Kafka Consumers
I'm opening a WIP pull request (against our rules) to get some feedback on
my ongoing work.
Please note that I'm on vacation next week (until August 17)
**Why this rework?**
The current `PersistentKafkaSource` does not always provide exactly-once
processing guarantees because we are using the high level Consumer API of Kafka.
We've chosen to use that API because it is handling all the corner cases
such as leader election, leader failover and other low level stuff.
The problem is that the API does not allow us to
- commit offsets manually
- consistently (across restarts) assign partitions to Flink instances
The Kafka community is aware of these issues and actively working on a new
Consumer API. See
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
and https://issues.apache.org/jira/browse/KAFKA-1326
The release of Kafka 0.8.3 is scheduled for October 2015 (see plan:
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan)
Therefore, I decided for the following approach:
Copy the code of the unreleased, new Kafka Consumer into the Flink consumer
and use it.
The new API has all the bells and whistles we need (manual committing,
per-partition subscriptions, nice APIs), but it is not completely backwards
compatible.
We can retrieve topic metadata with the new API from Kafka 0.8.1, 0.8.2
(and of course 0.8.3)
We can retrieve data from Kafka 0.8.2 (and 0.8.3)
We can only commit to Kafka 0.8.3
Therefore, this pull request contains three different user facing classes
`FlinkKafkaConsumer081`, `FlinkKafkaConsumer082` and `FlinkKafkaConsumer083`
for the different possible combinations.
For 0.8.1 we are using a hand-crafted implementation against the simple
consumer API
(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
so we had to do what we originally wanted to avoid.
I tried to make that implementation as robust and efficient as possible.
I'm intentionally not handling any broker failures in the code. For these
cases, I'm relying on Flink's fault tolerance mechanisms (which effectively
means redeploying the Kafka sources against other online brokers)
For reviewing the pull request, there are only a few important classes to
look at:
- FlinkKafkaConsumerBase
- IncludedFetcher
- LegacyFetcher (the one implementing the SimpleConsumer API)
I fixed a little bug in the stream graph generator. It was ignoring the
"number of execution retries" when no checkpointing is enabled.
Known issues:
- this pull request contains at least one failing test
- the KafkaConsumer contains at least one known, yet untested bug
- missing documentation
I will also open a pull request for using the new Producer API. It provides
much better performance and usability.
Open questions:
- Do we really want to copy 20k+ lines of code into our code base (for
now)?
If there are concerns about this, I could also manually implement the
missing pieces. Its probably 100 lines of code for getting the partition infos
for a topic, and we would use the Simple Consumer also for reading from 0.8.2.
- Do we want to use the packaging I'm suggesting here (additional maven
module for `flink-connector-kafka-083`). We would need to introduce it anyways
when Kafka releases 0.8.3 because the dependencies are not compatible.
But its adding confusion for our users.
I will write more documentation for guidance.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rmetzger/flink flink2386
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/996.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #996
----
commit 177e0bbc6bc613b67111ba038e0ded4fae8474f1
Author: Robert Metzger <[email protected]>
Date: 2015-07-20T19:39:46Z
wip
commit 70cb8f1ecb7df7a98313796609d2fa0dbade86bf
Author: Robert Metzger <[email protected]>
Date: 2015-07-21T15:21:45Z
[FLINK-2386] Add initial code for the new kafka connector, with everything
unreleased copied from the kafka sources
commit a4a2847908a8c2f118b8667d7cb66693c065c38d
Author: Robert Metzger <[email protected]>
Date: 2015-07-21T17:58:13Z
wip
commit b02cde37c2120ff6f0fcf1c233391a1d8804e594
Author: Robert Metzger <[email protected]>
Date: 2015-07-22T15:29:58Z
wip
commit 54a05c39d150b016e0a089daedb3492d986b93bd
Author: Robert Metzger <[email protected]>
Date: 2015-07-22T19:56:41Z
wip
commit 393fd6766a5df4bf14ef0c13864b8a4abdb62bb4
Author: Robert Metzger <[email protected]>
Date: 2015-07-22T20:20:20Z
we are good for a test drive
commit 3d66332e61665df9bafa05d2644b4fe1032da694
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T09:55:02Z
wip
commit b3e0c82c098d1aa27e418adb552f1b218c0f9550
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T11:49:05Z
fixed deserialization
commit 409eb8091fce6caa3d3c9cc1ffb0dc42c3f5e130
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T12:29:24Z
this one test seems to pass
commit d1bcac9886521a0b8b05b9c4b7c37a4667c7dcce
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T12:35:22Z
rat check
commit abd0f5b57610d0d8dcc3d530c816afc616cbf30b
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T12:45:13Z
successful build
commit ec28a40145f987450ab8938d7f1b6443be53d3b6
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T14:02:40Z
added a lot of debuggign stuff
commit 4df31ba2f0c53228f1a26307aa221ad4b9d5db68
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T14:17:21Z
another fix
commit 7d5f283a73b76ec2e81339dbdd619fe01988aaf8
Author: Robert Metzger <[email protected]>
Date: 2015-07-23T16:26:55Z
support for partitions < instances
commit 1c9ee9d09c3584ff168ce12a8438542032895953
Author: Robert Metzger <[email protected]>
Date: 2015-07-24T15:21:01Z
Improve error handling, fix offset handling
commit 9dad95e40cdc9acd6bc64ea5c7d54296320381ea
Author: Robert Metzger <[email protected]>
Date: 2015-07-24T16:24:29Z
wip
commit 12d24cb2c3d6a5b16bbde7619f0789e05118d5ac
Author: Robert Metzger <[email protected]>
Date: 2015-07-30T14:09:08Z
wip
commit 9fcd8b5af8c5e6e018d8c020ba0b9f9295c6da2b
Author: Robert Metzger <[email protected]>
Date: 2015-08-04T10:17:19Z
wip
commit bc18a0242319954f82158c059b94898bbea6a33c
Author: Robert Metzger <[email protected]>
Date: 2015-08-05T16:13:30Z
TODO: it seems that sinks are not participating in snapshots
commit ba7546aebae9d6067f915e8b55290148ccd4a4f5
Author: Robert Metzger <[email protected]>
Date: 2015-08-06T09:39:34Z
wip
commit 70cf8e4f0e6c4355040ee50472c252abca275799
Author: Robert Metzger <[email protected]>
Date: 2015-08-06T12:30:08Z
Tests should run
commit 19adb8668c10eb7db6c99273ab8bc4bfcdc11da8
Author: Robert Metzger <[email protected]>
Date: 2015-08-06T13:10:22Z
restore old behaivor
commit bd5595b213515dad30cfc5b2e9f3e1c6cfbd64b2
Author: Robert Metzger <[email protected]>
Date: 2015-08-06T15:01:20Z
Looks like the tests are working
commit 8dde02b17bd32fbc53cfdb67c5a43fa88b2e882c
Author: Robert Metzger <[email protected]>
Date: 2015-08-06T15:32:16Z
lets see whether the tests are working
----
> Implement Kafka connector using the new Kafka Consumer API
> ----------------------------------------------------------
>
> Key: FLINK-2386
> URL: https://issues.apache.org/jira/browse/FLINK-2386
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> Once Kafka has released its new consumer API, we should provide a connector
> for that version.
> The release will probably be called 0.9 or 0.8.3.
> The connector will be mostly compatible with Kafka 0.8.2.x, except for
> committing offsets to the broker (the new connector expects a coordinator to
> be available on Kafka). To work around that, we can provide a configuration
> option to commit offsets to zookeeper (managed by flink code).
> For 0.9/0.8.3 it will be fully compatible.
> It will not be compatible with 0.8.1 because of mismatching Kafka messages.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)