GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2271
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper See https://issues.apache.org/jira/browse/STORM-2675 This builds on https://github.com/apache/storm/pull/2268, so please ignore the first commit. Trident uses json-simple under the hood to persist some objects to Zookeeper. This isn't mentioned on the API docs (or I missed it), so the current storm-kafka-client implementation returns a bunch of objects json-simple can't figure out how to serialize. The result is that json-simple writes the toString of the objects to Zookeeper, which can't be read back out. This causes the Trident Kafka spout to start over every time it's rebooted. TransactionalState, which is used by Trident to read/write to/from Zookeeper, uses JSONValue.parse to read. That function fails quietly by returning null when there's a parsing error. There's a note in the code that we deliberately don't use the version of the parse function that throws exception on error, but we should at least log when it happens, since it's likely to be due to a bug in the spout or coordinator. This PR makes the following changes: * Manually serialize meta objects to List or Map so json-simple can handle them. * Print a warning log when TransactionalState fails to deserialize something from Zookeeper. * Fix the Trident Kafka spout's logic around first polls. It was referring to the committed offset of the KafkaConsumer, which is never updated because we don't commit to Kafka. The offsets are instead stored in Zookeeper via the meta object returned by Emitter.emitPartitionBatch. * Fix the generic type of Coordinator on OpaquePartitionedTridentSpout. It should use the same type as the first parameter for Emitter, since OpaquePartitionedSpoutExecutor will call getOrderedPartitions and getPartitionsForTask on the emitter with the object returned by Coordinator.getPartitionsForBatch. I don't think this is a breaking change, since this was a de facto constraint anyway. * Clarify in a comment on Subscription that Trident expects partitions to remain assigned to specific tasks. As far as I can tell there's a potential issue with the batch metadata cache kept by OpaqueTridentSpoutExecutor getting outdated if partitions are shuffled without workers dying. If anyone has suggestions for tests of this, I'm happy to add some. I'm wondering why we don't use Kryo for serialization to Zookeeper, since the json-simple library is so inflexible (it can only handle some collections and primitive wrappers). You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-2675 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2271 ---- commit 4db257ff00ce4bb54bbdd7850fa9e4787752c7ad Author: Stig Rohde Døssing <s...@apache.org> Date: 2017-08-08T11:04:04Z STORM-2689: Simplify dependency configuration for storm-kafka-examples and storm-kafka-client-examples commit 9288b88ffa088878dafc84bd5780754f1212988c Author: Stig Rohde Døssing <s...@apache.org> Date: 2017-08-04T00:53:42Z STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---