Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2465#discussion_r157374831 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java --- @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() { OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (found) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, - nextCommitMsg.getMetadata(Thread.currentThread())); + try { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, + JSON_MAPPER.writeValueAsString(new KafkaSpout.Info(Thread.currentThread(), context))); --- End diff -- I think that approach is less efficient. Currently we only create and serialize the commit metadata when there are offsets ready to commit. Your proposed solution will always serialize, regardless if there are offsets ready to commit or not. Furthermore, ObjectMapper is a public static field, which means that per JVM we will be creating only two of them, on for the spout, and another for OffsetManager. We can even use the same instance on both classes if we want to.
---