Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157362347
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
 ---
    @@ -140,10 +140,16 @@ public OffsetAndMetadata findNextCommitOffset() {
     
             OffsetAndMetadata nextCommitOffsetAndMetadata = null;
             if (found) {
    -            nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    -                nextCommitMsg.getMetadata(Thread.currentThread()));
    +            try {
    +                nextCommitOffsetAndMetadata = new 
OffsetAndMetadata(nextCommitOffset,
    +                    JSON_MAPPER.writeValueAsString(new 
KafkaSpout.Info(Thread.currentThread(), context)));
    --- End diff --
    
    Consider moving the JSON serialization into KafkaSpout commitOffsets method 
instead, then you can pass it as a parameter to this method and we can get away 
with having only one ObjectMapper. Currently we can just create the info once 
(e.g. in open) and reuse rather than creating and serializing a new one for 
each commit and partition.


---

Reply via email to