I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
adding headers. Updating the compatibility matrix must have just slipped
when that bugfix was merged -- thanks for bringing this up!

On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <alisson.sa...@gmail.com>
wrote:

> Hi Guozhang, thanks for your reply.
>
> I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> upgrading to this version mostly because we were facing problems with
> KTable suppress.
>
> I was experiencing this exact same problem:
>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
>
> When trying to confirm the fix worked for my topology/app I encountered the
> issue: java.lang.IllegalArgumentException: Magic v1 does not support
> record.
>
> In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> but fails with the error above if I use 2.2.1.
>
> I haven't changed any part of the code, simply updated my gradle file
> updating the dependency.
>
> Thanks again
>
> On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Alisson,
> >
> > The root cause you've seen is the message header support, which is added
> in
> > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244).
> If
> > your code does not add any more headers then it would only inherit the
> > headers from source topics when trying to write to intermediate / sink
> > topics. So I think that even if you were using 2.2.0 you'd still hit this
> > issue if you happen to have headers in some of your source topic
> messages.
> >
> > I've updated
> > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> per
> > the updates.
> >
> >
> > Guozhang
> >
> > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <alisson.sa...@gmail.com>
> > wrote:
> >
> > > Hi all, I've just upgraded a project that was using kafka-streams 2.2.0
> > to
> > > 2.2.1 and found the following error at the end.
> > >
> > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> 0.11
> > > the error doesn't happen anymore.
> > >
> > > My question here is: where is the best place we can find the required
> > > minimum broker version for the kafka-streams version one is using?
> > >
> > > This is not clear to me and the
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > wiki
> > > page seems outdated.
> > >
> > > Thanks in advance
> > >
> > > Exception in thread
> > >
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > Failed
> > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> > > Abort sending since an error caught with a previous record (key
> > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > timestamp
> > > null) to topic
> > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog
> due
> > to
> > > java.lang.IllegalArgumentException: Magic v1 does not support record
> > > headers
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > > ... 10 more
> > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not
> support
> > > record headers
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > > at
> > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > > at
> > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to