Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157374831
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
    @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
     
             OffsetAndMetadata nextCommitOffsetAndMetadata = null;
             if (found) {
    -            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    -                nextCommitMsg.getMetadata(Thread.currentThread()));
    +            try {
    +                nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    +                    JSON_MAPPER.writeValueAsString(new 
KafkaSpout.Info(Thread.currentThread(), context)));
    --- End diff --
    
    I think that approach is less efficient. Currently we only create and 
serialize the commit metadata when there are offsets ready to commit. Your 
proposed solution will always serialize, regardless if there are offsets ready 
to commit or not. Furthermore, ObjectMapper is a public static field, which 
means that per JVM we will be creating only two of them, on for the spout, and 
another for OffsetManager. We can even use the same instance on both classes if 
we want to.


---

Reply via email to