Hi Pradeep,

When you move the message to an error queue, is this happening from inside
the Avro bolt or are you emitting a tuple? Can you verify that the tuple is
being acked in the Avro bolt exactly once (double acking will cause the
tuple to fail)?

Storm will ack messages on the spout as long as all edges in the tuple tree
are acked, and the topology message timeout hasn't expired before this
occurs.

For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt is
the only bolt consuming from the spout, the bolt will receive t0 and must
ack it exactly once. If the AvroDeserializerBolt emits any tuples anchored
to t0 (using any of the
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html
methods that take a Tuple anchor), the downstream bolts must ack those
exactly once too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0.
The root tuple on the spout is only acked if t0, t0_0 and t0_1 are acked
once each, and they all get acked before the message timeout elapses.

Depending on your throughput this may be infeasible, but you might try
enabling debug logging
https://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
which will let you tell whether the tuple is being acked on the spout.

If the tuple is being acked on the spout, you might want to look at some of
the logs from this method
https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L64.
They should show you what the spout is doing internally. Keep in mind that
the spout can only commit e.g. offset 10 if offsets 0-9 have all been
acked/committed, so if an earlier tuple failed and is waiting for retry
when you restart, that could also cause this.

2017-09-01 7:04 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>:

> Hi,
> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka server
> is 0.10.1.1.
>
> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>
> Message flow is like below and its a normal topology
>
> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>
> If the message fails avro deserialization , i am moving the message to a
> error queue and acknowledging from the avro bolt . This message is not
> emitted to database bolt .
>
> But its observed that after i restart topology , offset for the topic is
> going back to old offset.
>
> Will Kafka commit the offset, only if the message is acked from all bolts ?
>
> Is the offset going back to previous value is beacuse of this ..
>
> Thanks
> Pradeep
>

Reply via email to