GitHub user srdo opened a pull request:
https://github.com/apache/storm/pull/2271
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta
objects to Zookeeper
See https://issues.apache.org/jira/browse/STORM-2675
This builds on https://github.com/apache/storm/pull/2268, so please ignore
the first commit.
Trident uses json-simple under the hood to persist some objects to
Zookeeper. This isn't mentioned on the API docs (or I missed it), so the
current storm-kafka-client implementation returns a bunch of objects
json-simple can't figure out how to serialize. The result is that json-simple
writes the toString of the objects to Zookeeper, which can't be read back out.
This causes the Trident Kafka spout to start over every time it's rebooted.
TransactionalState, which is used by Trident to read/write to/from
Zookeeper, uses JSONValue.parse to read. That function fails quietly by
returning null when there's a parsing error. There's a note in the code that we
deliberately don't use the version of the parse function that throws exception
on error, but we should at least log when it happens, since it's likely to be
due to a bug in the spout or coordinator.
This PR makes the following changes:
* Manually serialize meta objects to List or Map so json-simple can handle
them.
* Print a warning log when TransactionalState fails to deserialize
something from Zookeeper.
* Fix the Trident Kafka spout's logic around first polls. It was referring
to the committed offset of the KafkaConsumer, which is never updated because we
don't commit to Kafka. The offsets are instead stored in Zookeeper via the meta
object returned by Emitter.emitPartitionBatch.
* Fix the generic type of Coordinator on OpaquePartitionedTridentSpout. It
should use the same type as the first parameter for Emitter, since
OpaquePartitionedSpoutExecutor will call getOrderedPartitions and
getPartitionsForTask on the emitter with the object returned by
Coordinator.getPartitionsForBatch. I don't think this is a breaking change,
since this was a de facto constraint anyway.
* Clarify in a comment on Subscription that Trident expects partitions to
remain assigned to specific tasks. As far as I can tell there's a potential
issue with the batch metadata cache kept by OpaqueTridentSpoutExecutor getting
outdated if partitions are shuffled without workers dying.
If anyone has suggestions for tests of this, I'm happy to add some. I'm
wondering why we don't use Kryo for serialization to Zookeeper, since the
json-simple library is so inflexible (it can only handle some collections and
primitive wrappers).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/srdo/storm STORM-2675
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2271.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2271
----
commit 4db257ff00ce4bb54bbdd7850fa9e4787752c7ad
Author: Stig Rohde Døssing <[email protected]>
Date: 2017-08-08T11:04:04Z
STORM-2689: Simplify dependency configuration for storm-kafka-examples and
storm-kafka-client-examples
commit 9288b88ffa088878dafc84bd5780754f1212988c
Author: Stig Rohde Døssing <[email protected]>
Date: 2017-08-04T00:53:42Z
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta
objects to Zookeeper
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---