Hi Mathieu,

In Streams the consumer config "enable.auto.commit" is always forced to
false, and a separate "commit.interval.ms" is set. With that even if you do
not have any data processed the commit operation will be triggered after
that configured period of time.


Guozhang


On Wed, Feb 22, 2017 at 8:41 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Eno,
>
> Thanks for the quick reply.  I think that probably does match the data I'm
> seeing.  This surprises me a bit because my streams app was only offline
> for a few minutes, but ended up losing its offset.
>
> My interpretation is that the source partition had been idle for 24 hours,
> streams doesn't commit offsets for idle partitions, and so the
> default/unconfigured offset retention of 24 hours had expired.
>
> I'll work around this by bumping up my offset retention.  Thanks!
>
> Mathieu
>
>
> On Wed, Feb 22, 2017 at 9:22 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Hi Mathieu,
> >
> > It could be that the offset retention period has expired. See this:
> > http://stackoverflow.com/questions/39131465/how-does-
> > an-offset-expire-for-an-apache-kafka-consumer-group <
> > http://stackoverflow.com/questions/39131465/how-does-
> > an-offset-expire-for-an-apache-kafka-consumer-group>
> >
> > Thanks
> > Eno
> >
> > > On 22 Feb 2017, at 16:08, Mathieu Fenniak <
> mathieu.fenn...@replicon.com>
> > wrote:
> > >
> > > Hey users,
> > >
> > > What causes delete tombstones (value=null) to be sent to the
> > > __consumer_offsets topic?
> > >
> > > I'm observing that a Kafka Streams application that is restarted after
> a
> > > crash appears to be reprocessing messages from the beginning of a
> topic.
> > > I've dumped the __consumer_offsets topic and found that after the
> > restart,
> > > messages with a null value are being sent to __consumer_offsets.
> > >
> > > I do see that the ConsumerConfig for my StreamThread consumer has
> > > auto.offset.reset=earliest.  But my understanding of this configuration
> > is
> > > that it only applies when the offset isn't available, but there are
> > > definitely offsets for this consumer group stored in
> __consumer_offsets.
> > >
> > > Here's the consumer config for the streams app:
> > >
> > > ConsumerConfig values:
> > >  auto.commit.interval.ms = 5000
> > >  auto.offset.reset = earliest
> > >  bootstrap.servers = [10.10.59.184:9092]
> > >  check.crcs = true
> > >  client.id =
> > > timesheet-list-2d7a7f37-f41a-46b0-a1bb-d47f773012f6-
> > StreamThread-1-consumer
> > >  connections.max.idle.ms = 540000
> > >  enable.auto.commit = false
> > >  exclude.internal.topics = true
> > >  fetch.max.bytes = 52428800
> > >  fetch.max.wait.ms = 500
> > >  fetch.min.bytes = 1
> > >  group.id = timesheet-list
> > >  heartbeat.interval.ms = 3000
> > >  interceptor.classes = null
> > >  key.deserializer = class
> > > org.apache.kafka.common.serialization.ByteArrayDeserializer
> > >  max.partition.fetch.bytes = 1048576
> > >  max.poll.interval.ms = 1800000
> > >  max.poll.records = 1000
> > >  metadata.max.age.ms = 300000
> > >  metric.reporters = []
> > >  metrics.num.samples = 2
> > >  metrics.recording.level = INFO
> > >  metrics.sample.window.ms = 30000
> > >  partition.assignment.strategy =
> > > [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
> > >  receive.buffer.bytes = 65536
> > >  reconnect.backoff.ms = 50
> > >  request.timeout.ms = 1801000
> > >  retry.backoff.ms = 100
> > >  sasl.jaas.config = null
> > >  sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > >  sasl.kerberos.min.time.before.relogin = 60000
> > >  sasl.kerberos.service.name = null
> > >  sasl.kerberos.ticket.renew.jitter = 0.05
> > >  sasl.kerberos.ticket.renew.window.factor = 0.8
> > >  sasl.mechanism = GSSAPI
> > >  security.protocol = PLAINTEXT
> > >  send.buffer.bytes = 131072
> > >  session.timeout.ms = 10000
> > >  ssl.cipher.suites = null
> > >  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > >  ssl.endpoint.identification.algorithm = null
> > >  ssl.key.password = null
> > >  ssl.keymanager.algorithm = SunX509
> > >  ssl.keystore.location = null
> > >  ssl.keystore.password = null
> > >  ssl.keystore.type = JKS
> > >  ssl.protocol = TLS
> > >  ssl.provider = null
> > >  ssl.secure.random.implementation = null
> > >  ssl.trustmanager.algorithm = PKIX
> > >  ssl.truststore.location = null
> > >  ssl.truststore.password = null
> > >  ssl.truststore.type = JKS
> > >  value.deserializer = class
> > > org.apache.kafka.common.serialization.ByteArrayDeserializer
> >
> >
>



-- 
-- Guozhang

Reply via email to