Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157537266
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
OffsetAndMetadata nextCommitOffsetAndMetadata = null;
if (found) {
- nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
- nextCommitMsg.getMetadata(Thread.currentThread()));
+ try {
+ nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
+ JSON_MAPPER.writeValueAsString(new
KafkaSpout.Info(Thread.currentThread(), context)));
--- End diff --
Will do.
---