Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2465#discussion_r157346574
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
---
@@ -18,36 +18,88 @@
package org.apache.storm.kafka.spout;
+import java.io.IOException;
import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
public class KafkaSpoutMessageId implements Serializable {
+ public static final ObjectMapper JSON_MAPPER;
+
+ @JsonDeserialize(using = TopicPartitionJsonDeserializer.class)
private final TopicPartition topicPart;
private final long offset;
+ private final String thread;
+ private final String topologyId; //TODO: rename
+
private int numFails = 0;
/**
* true if the record was emitted using a form of collector.emit(...).
false
* when skipping null tuples as configured by the user in
KafkaSpoutConfig
*/
private boolean emitted;
- public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
- this(consumerRecord, true);
+ static {
+ JSON_MAPPER = new ObjectMapper();
+
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(TopicPartition.class, new
TopicPartitionJsonSerializer(TopicPartition.class));
+ module.addDeserializer(TopicPartition.class, new
TopicPartitionJsonDeserializer(TopicPartition.class));
+
+ JSON_MAPPER.registerModule(module);
}
- public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord,
boolean emitted) {
- this(new TopicPartition(consumerRecord.topic(),
consumerRecord.partition()), consumerRecord.offset(), emitted);
+ //TODO Revisit constructors
+ public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord,
TopologyContext context) {
+ this(consumerRecord, true, context);
}
- public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
- this(topicPart, offset, true);
+ public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord,
boolean emitted, TopologyContext context) {
+ this(new TopicPartition(consumerRecord.topic(),
consumerRecord.partition()), consumerRecord.offset(), context);
}
- public KafkaSpoutMessageId(TopicPartition topicPart, long offset,
boolean emitted) {
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset,
TopologyContext context) {
+ this(topicPart, offset, true, context);
+ }
+
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset,
boolean emitted, TopologyContext context) {
+ this(topicPart, offset, emitted, Thread.currentThread().getName(),
context.getStormId());
+ }
+
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset,
boolean emitted, String thread, String topologyId) {
+ this(topicPart, offset, emitted, thread, topologyId, 0);
+ }
+
+ // Used for JSON Deserialization
+ @JsonCreator
+ private KafkaSpoutMessageId(@JsonProperty("topicPartition")
TopicPartition topicPart,
+ @JsonProperty("offset")long offset,
+ @JsonProperty("emitted") boolean emitted,
+ @JsonProperty("thread") String thread,
+ @JsonProperty("topologyId") String
topologyId,
+ @JsonProperty("numFails") int numFails) {
+
this.topicPart = topicPart;
this.offset = offset;
this.emitted = emitted;
+ this.thread = thread;
+ this.topologyId = topologyId;
+ this.numFails = numFails;
--- End diff --
We don't need to store/restore numFails and emitted in metadata.
---