Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157375639
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
    @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
     
             OffsetAndMetadata nextCommitOffsetAndMetadata = null;
             if (found) {
    -            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    -                nextCommitMsg.getMetadata(Thread.currentThread()));
    +            try {
    +                nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    +                    JSON_MAPPER.writeValueAsString(new 
KafkaSpout.Info(Thread.currentThread(), context)));
    --- End diff --
    
    I think maybe I was being a little unclear. I'm saying that right now the 
information contained in the Info object is constant for the lifetime of the 
spout. We should be able to just create one Info object in KafkaSpout.open and 
reuse it. This means a single serialization for the lifetime of the spout, 
which is more efficient than doing it on every commit. 


---

Reply via email to