GitHub user srdo opened a pull request:
https://github.com/apache/storm/pull/2300
STORM-2691: Make storm-kafka-client implement the Trident interface
correctly
This PR is based on https://github.com/apache/storm/pull/2271, so please
ignore the first commit. The broad thrust of this PR is to fix the
storm-kafka-client Trident spout so it implements the Trident API as intended.
The current implementation takes some shortcuts because it used to be necessary
to support use of the KafkaConsumer.subscribe API, but it causes some issues
listed below.
* The changes in https://github.com/apache/storm/pull/2009 released in
1.1.0 made some changes to the OpaquePartitionedTridentSpoutExecutor that
likely broke IOpaquePartitionedTridentSpout implementations other than
storm-kafka-client. The changed code used to request sorted partitions from the
spout via getOrderedPartitions, do a round-robin partitioning, and assign
partitions via refreshPartitions
https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100.
The new code just passes the output of getOrderedPartitions into
refreshPartitions
https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120.
It looks to me like refreshPartitions is passed the list of all partitions
assigned to any spout task, rather than just the partitions assigned to the
current task.
The proposed fix will use getOrderedPartitions to get the sorted partitions
list, pass the list into getPartitionsForTask, and pass the resulting list of
assigned partitions back into refreshPartitions.
* The current implementation of the Trident Kafka spout only has one
consumer in the Emitter. The Coordinator needs to know which partitions are
involved in a batch, and this information is shared via a static field the
Emitter writes to. The Coordinator and Emitter are both running as regular
bolts, so there's no guarantee that they're in the same JVM.
The Subscription interface doesn't fit Trident very well. Trident assumes
that there is a coordinator bolt that will determine which partitions should be
involved in a batch, and the emitter bolts should partition the list emitted by
the coordinator and each emit for only the partitions assigned to the relevant
emitter. The Subscription interface only has a `subscribe()` method that does
everything, which means we can't delegate parts of the subscription process to
the coordinator. The current spout does a workaround that assumes that the
coordinator bolt will be in the same worker as one or more emitters, and just
makes the emitter handle everything while the coordinator reads partition
information from a static field. This only works if the coordinator happens to
be sharing a worker with the emitters, and causes issues where the coordinator
emits a set of partitions, and the emitters happen to do a reassignment before
the message arrives.
The suggested fix is to split the subscription process into 4 steps that
fit what Trident needs: Get all partitions for the spout tasks, sort the
partitions, decide which partitions belong to this task and do the assignment.
This allows us to get rid of the shared static field and split the subscription
process across the coordinator and emitters. We'll lose a bit of flexibility in
implementing Subscriptions, since we're now locking implementations to follow
these steps, but I don't think it's a great loss.
I don't think we can port this to 1.x without breaking the public API for
the spout. I'd be okay with breaking the API in 1.2.0 unless anyone has an idea
for a workaround.
Other small changes:
* The Trident spout was previously checking that all output stream fields
were identical. As far as I can tell * Trident only supports one output stream,
so check for that instead.
* Removed an empty transactional spout implementation.
* Remove TopicPartition from KafkaTridentSpoutBatchMetadata. It is
unnecessary, the OpaquePartitionedTridentSpoutExecutor keeps track of which
metadata belongs to which partition.
* Upgrade Mockito to latest 1.x version for Rule support.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/srdo/storm STORM-2691
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2300.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2300
----
commit 54a829ceba6c1575d0665721509889e4b60dd066
Author: Stig Rohde Døssing <[email protected]>
Date: 2017-08-04T00:53:42Z
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta
objects to Zookeeper
commit 2d1a9cb473d766b52dafa67115a087535939f7b0
Author: Stig Rohde Døssing <[email protected]>
Date: 2017-08-18T20:13:38Z
STORM-2691: storm-kafka-client Trident spout does not implement the Trident
spout interface properly
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---