Awesome, thanks for clarifying :) On Tue, Aug 27, 2019 at 1:08 PM Guozhang Wang <wangg...@gmail.com> wrote:
> Right, the fix itself actually add more headers even if there were none > from the source topics, and hence cause old versioned brokers to fail. But > theoretically speaking, as long as the streams clients are version 0.11.0+ > the broker version should be 0.11.0+ for various features that may require > higher message format (eos, suppression, etc). > > On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 > involved > > adding headers. Updating the compatibility matrix must have just slipped > > when that bugfix was merged -- thanks for bringing this up! > > > > On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <alisson.sa...@gmail.com> > > wrote: > > > > > Hi Guozhang, thanks for your reply. > > > > > > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm > > > upgrading to this version mostly because we were facing problems with > > > KTable suppress. > > > > > > I was experiencing this exact same problem: > > > > > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156 > > > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895. > > > > > > When trying to confirm the fix worked for my topology/app I encountered > > the > > > issue: java.lang.IllegalArgumentException: Magic v1 does not support > > > record. > > > > > > In summary the topology works fine on 0.10.2.1 with kafka-streams > 2.2.0, > > > but fails with the error above if I use 2.2.1. > > > > > > I haven't changed any part of the code, simply updated my gradle file > > > updating the dependency. > > > > > > Thanks again > > > > > > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Alisson, > > > > > > > > The root cause you've seen is the message header support, which is > > added > > > in > > > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 > > (KIP-244). > > > If > > > > your code does not add any more headers then it would only inherit > the > > > > headers from source topics when trying to write to intermediate / > sink > > > > topics. So I think that even if you were using 2.2.0 you'd still hit > > this > > > > issue if you happen to have headers in some of your source topic > > > messages. > > > > > > > > I've updated > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > per > > > > the updates. > > > > > > > > > > > > Guozhang > > > > > > > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales < > alisson.sa...@gmail.com > > > > > > > wrote: > > > > > > > > > Hi all, I've just upgraded a project that was using kafka-streams > > 2.2.0 > > > > to > > > > > 2.2.1 and found the following error at the end. > > > > > > > > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to > > > 0.11 > > > > > the error doesn't happen anymore. > > > > > > > > > > My question here is: where is the best place we can find the > required > > > > > minimum broker version for the kafka-streams version one is using? > > > > > > > > > > This is not clear to me and the > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix > > > > > wiki > > > > > page seems outdated. > > > > > > > > > > Thanks in advance > > > > > > > > > > Exception in thread > > > > > > > > > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1" > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] > > > > Failed > > > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009 > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > > > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task > > [1_2] > > > > > Abort sending since an error caught with a previous record (key > > > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db > > > > > timestamp > > > > > null) to topic > > > > > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog > > > due > > > > to > > > > > java.lang.IllegalArgumentException: Magic v1 does not support > record > > > > > headers > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > > > > > ... 10 more > > > > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not > > > support > > > > > record headers > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167) > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > -- > -- Guozhang >