Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157374831
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
---
@@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
OffsetAndMetadata nextCommitOffsetAndMetadata = null;
if (found) {
- nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
- nextCommitMsg.getMetadata(Thread.currentThread()));
+ try {
+ nextCommitOffsetAndMetadata = new
OffsetAndMetadata(nextCommitOffset,
+ JSON_MAPPER.writeValueAsString(new
KafkaSpout.Info(Thread.currentThread(), context)));
--- End diff --
I think that approach is less efficient. Currently we only create and
serialize the commit metadata when there are offsets ready to commit. Your
proposed solution will always serialize, regardless if there are offsets ready
to commit or not. Furthermore, ObjectMapper is a public static field, which
means that per JVM we will be creating only two of them, on for the spout, and
another for OffsetManager. We can even use the same instance on both classes if
we want to.
---