Re: Errors producing / consuming with kafka 0.10.0.0
I was able to make this error disappear by upgrading my client library from 0.10.0.0 to 0.10.0.1 On Wed, Jan 18, 2017 at 10:40 AM, Ryan Thompson wrote: > Hello, > > I'm attempting to upgrade an application from 0.8 to 0.10 broker / client > libs, and integrate kafka streams. I am currently using the following > producer / consumer configs: > > Producer: > > Properties props = new Properties(); > props.put("bootstrap.servers", brokerList); > props = ProducerConfig.addSerializerToConfig(props, > Serdes.String().serializer(), Serdes.String().serializer()); > return props; > > > Consumer (kafka streams Processor) > > Properties streamsSettings = new Properties(); > streamsSettings.put("bootstrap.servers", brokerList); > streamsSettings.put("application.id", consumerGroupId); > streamsSettings.put("key.serde", Serdes.StringSerde.class. > getName()); > streamsSettings.put("value.serde", Serdes.StringSerde.class. > getName()); > StreamsConfig config = new StreamsConfig(streamsSettings); > > I'm running a 0.10 broker. However, when I publish a message, I see the > following error on the consumer side: > > org.apache.kafka.common.KafkaException: Error deserializing key/value for > partition -0 at offset 0 > at org.apache.kafka.clients.consumer.internals.Fetcher. > parseRecord(Fetcher.java:665) > at org.apache.kafka.clients.consumer.internals.Fetcher. > handleFetchResponse(Fetcher.java:593) > at org.apache.kafka.clients.consumer.internals.Fetcher. > access$000(Fetcher.java:71) > at org.apache.kafka.clients.consumer.internals.Fetcher$1. > onSuccess(Fetcher.java:142) > at org.apache.kafka.clients.consumer.internals.Fetcher$1. > onSuccess(Fetcher.java:139) > at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess( > RequestFuture.java:133) > at org.apache.kafka.clients.consumer.internals.RequestFuture.complete( > RequestFuture.java:107) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$ > RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > clientPoll(ConsumerNetworkClient.java:360) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:224) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:201) > at org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:998) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:937) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:295) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:218) > Caused by: java.lang.IllegalArgumentException: null > at java.nio.Buffer.limit(Buffer.java:275) > at org.apache.kafka.common.record.Record.sliceDelimited(Record.java:392) > at org.apache.kafka.common.record.Record.key(Record.java:376) > at org.apache.kafka.clients.consumer.internals.Fetcher. > parseRecord(Fetcher.java:650) > ... 15 common frames omitted > > > What could be causing this error? I see the same error if I attempt to > use the kafka bin tools to consume from the topic. > > Thanks, > Ryan >
Errors producing / consuming with kafka 0.10.0.0
Hello, I'm attempting to upgrade an application from 0.8 to 0.10 broker / client libs, and integrate kafka streams. I am currently using the following producer / consumer configs: Producer: Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props = ProducerConfig.addSerializerToConfig(props, Serdes.String().serializer(), Serdes.String().serializer()); return props; Consumer (kafka streams Processor) Properties streamsSettings = new Properties(); streamsSettings.put("bootstrap.servers", brokerList); streamsSettings.put("application.id", consumerGroupId); streamsSettings.put("key.serde", Serdes.StringSerde.class.getName()); streamsSettings.put("value.serde", Serdes.StringSerde.class.getName()); StreamsConfig config = new StreamsConfig(streamsSettings); I'm running a 0.10 broker. However, when I publish a message, I see the following error on the consumer side: org.apache.kafka.common.KafkaException: Error deserializing key/value for partition -0 at offset 0 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:665) at org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:593) at org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:71) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:142) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) Caused by: java.lang.IllegalArgumentException: null at java.nio.Buffer.limit(Buffer.java:275) at org.apache.kafka.common.record.Record.sliceDelimited(Record.java:392) at org.apache.kafka.common.record.Record.key(Record.java:376) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:650) ... 15 common frames omitted What could be causing this error? I see the same error if I attempt to use the kafka bin tools to consume from the topic. Thanks, Ryan
Re: Kafka offset out of range using SimpleConsumer Example
#1 turned out to be invalid, my logging was simply bad (I was logging the number of messages I'd read so far for that partition, not the requested read offset) #2 is still valid, though. I'm thinking that a possible explanation might be that the part of the log I was processing was deleted, but I'm not sure yet. Thanks, Ryan On Thu, Aug 11, 2016 at 9:23 PM, Ryan Thompson wrote: > Hello, > > I've implemented something quite similar to the SimpleConsumer example on > https://cwiki.apache.org/confluence/display/KAFKA/0.8. > 0+SimpleConsumer+Example > > I'm using it to traverse a specific range of offsets. > > I find that sometimes, in the middle of this traversal, I end up hitting > an "Offset out of range" error (error code 1). The offset that's being > requested when this happens is determined by a previous message's > nextOffset(), as in the example. > > When this happens (and it doesn't always happen), I find that this offset > is not even close to being in the range I am attempting to traverse. For > instance, in one case, I was traversing the range (1665615779, 1861334452), > but ended up requesting offset 1193311 (again, based on a message's > nextOffset() value, in the middle of traversal). > > Two questions: > 1- What could explain this behavior (message.nextOffset() being out of the > range I am traversing) > 2- Is there any more robust way to recover from this other than simply > bailing, as in the example? > > Thanks, > Ryan >
Kafka offset out of range using SimpleConsumer Example
Hello, I've implemented something quite similar to the SimpleConsumer example on https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example I'm using it to traverse a specific range of offsets. I find that sometimes, in the middle of this traversal, I end up hitting an "Offset out of range" error (error code 1). The offset that's being requested when this happens is determined by a previous message's nextOffset(), as in the example. When this happens (and it doesn't always happen), I find that this offset is not even close to being in the range I am attempting to traverse. For instance, in one case, I was traversing the range (1665615779, 1861334452), but ended up requesting offset 1193311 (again, based on a message's nextOffset() value, in the middle of traversal). Two questions: 1- What could explain this behavior (message.nextOffset() being out of the range I am traversing) 2- Is there any more robust way to recover from this other than simply bailing, as in the example? Thanks, Ryan
Kafka streams for out of order density aggregation
Hello, Say I have a stream, and want to determine whether or not a given "density" of of records match a given condition. For example, let's say I want to how many of the last 10 records have a numerical value greater than 100. Does the kafka streams DSL (or processor API) provide a way to do this type of aggregation in a way that supports out of order messages? I can't use a time window based aggregation here because my window is based on a quantity of records (in this case, the last 10) rather than time. However, I still want to know about the last 10 records regardless of what order they arrive. Thanks, Ryan
Scaling up partitions with kafka streams and other stateful stream processing frameworks
Hello, I'm wondering if fault tolerant state management with kafka streams works seamlessly if partitions are scaled up. My understanding is that this is indeed a problem that stateful stream processing frameworks need to solve, and that: with samza, this is not a solved problem (though I also understand it's being worked on, based on a conversation I had yesterday at the kafka summit with someone who works on samza) with flink, there's a plan to solve this: "The way we plan to implement this in Flink is by shutting the dataflow down with a checkpoint, and bringing the dataflow back up with a different parallelism." http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/ with kafka streams, I haven't been able to find a solid answer on whether or not this problem is solved for users, or if we need to handle it ourselves. Thanks, Ryan