GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2300

    STORM-2691: Make storm-kafka-client implement the Trident interface 
correctly

    This PR is based on https://github.com/apache/storm/pull/2271, so please 
ignore the first commit. The broad thrust of this PR is to fix the 
storm-kafka-client Trident spout so it implements the Trident API as intended. 
The current implementation takes some shortcuts because it used to be necessary 
to support use of the KafkaConsumer.subscribe API, but it causes some issues 
listed below.
    
    * The changes in https://github.com/apache/storm/pull/2009 released in 
1.1.0 made some changes to the OpaquePartitionedTridentSpoutExecutor that 
likely broke IOpaquePartitionedTridentSpout implementations other than 
storm-kafka-client. The changed code used to request sorted partitions from the 
spout via getOrderedPartitions, do a round-robin partitioning, and assign 
partitions via refreshPartitions 
https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100.
 The new code just passes the output of getOrderedPartitions into 
refreshPartitions 
https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120.
 It looks to me like refreshPartitions is passed the list of all partitions 
assigned to any spout task, rather than just the partitions assigned to the 
current task.
    
    The proposed fix will use getOrderedPartitions to get the sorted partitions 
list, pass the list into getPartitionsForTask, and pass the resulting list of 
assigned partitions back into refreshPartitions.
    
    * The current implementation of the Trident Kafka spout only has one 
consumer in the Emitter. The Coordinator needs to know which partitions are 
involved in a batch, and this information is shared via a static field the 
Emitter writes to. The Coordinator and Emitter are both running as regular 
bolts, so there's no guarantee that they're in the same JVM.
    
    The Subscription interface doesn't fit Trident very well. Trident assumes 
that there is a coordinator bolt that will determine which partitions should be 
involved in a batch, and the emitter bolts should partition the list emitted by 
the coordinator and each emit for only the partitions assigned to the relevant 
emitter. The Subscription interface only has a `subscribe()` method that does 
everything, which means we can't delegate parts of the subscription process to 
the coordinator. The current spout does a workaround that assumes that the 
coordinator bolt will be in the same worker as one or more emitters, and just 
makes the emitter handle everything while the coordinator reads partition 
information from a static field. This only works if the coordinator happens to 
be sharing a worker with the emitters, and causes issues where the coordinator 
emits a set of partitions, and the emitters happen to do a reassignment before 
the message arrives.
    
    The suggested fix is to split the subscription process into 4 steps that 
fit what Trident needs: Get all partitions for the spout tasks, sort the 
partitions, decide which partitions belong to this task and do the assignment. 
This allows us to get rid of the shared static field and split the subscription 
process across the coordinator and emitters. We'll lose a bit of flexibility in 
implementing Subscriptions, since we're now locking implementations to follow 
these steps, but I don't think it's a great loss.
    
    I don't think we can port this to 1.x without breaking the public API for 
the spout. I'd be okay with breaking the API in 1.2.0 unless anyone has an idea 
for a workaround.
    
    Other small changes:
    * The Trident spout was previously checking that all output stream fields 
were identical. As far as I can tell * Trident only supports one output stream, 
so check for that instead.
    * Removed an empty transactional spout implementation.
    * Remove TopicPartition from KafkaTridentSpoutBatchMetadata. It is 
unnecessary, the OpaquePartitionedTridentSpoutExecutor keeps track of which 
metadata belongs to which partition.
    * Upgrade Mockito to latest 1.x version for Rule support.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2691

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2300
    
----
commit 54a829ceba6c1575d0665721509889e4b60dd066
Author: Stig Rohde Døssing <[email protected]>
Date:   2017-08-04T00:53:42Z

    STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta 
objects to Zookeeper

commit 2d1a9cb473d766b52dafa67115a087535939f7b0
Author: Stig Rohde Døssing <[email protected]>
Date:   2017-08-18T20:13:38Z

    STORM-2691: storm-kafka-client Trident spout does not implement the Trident 
spout interface properly

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to