Re: NPE on startup with a low-level API based application
Hey Frank, I think I spotted the issue and submitted a patch. Here's a link to the JIRA: https://issues.apache.org/jira/browse/KAFKA-5456. I expect we'll get the fix into 0.11.0.0. Thanks for finding this! -Jason On Thu, Jun 15, 2017 at 11:53 PM, Frank Lyaruu wrote: > Yes, compression was on (lz4), key and value sizes fluctuate, key sizes are > small <10 bytes, value sizes fluctuate also, but nothing crazy, up to about > 100kb. > > I did some stepping through the code and at some point I saw a branch that > used a different path depending on protocol version (something with > LegacyRecord), then I figured updating the broker was worth a shot. > > I can do some more testing, but I need to set up another 0.10.2.1 cluster > first. > > Frank > > On Fri, Jun 16, 2017 at 2:09 AM, Apurva Mehta wrote: > > > Finally, was compression enabled when you hit this exception? If so, > which > > compression algorithm was enabled? > > > > On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta > wrote: > > > > > Frank: it would be even better if you could share the key and value > which > > > was causing this problem. Maybe share it on the JIRA: > > > https://issues.apache.org/jira/browse/KAFKA-5456 ? > > > > > > Thanks, > > > Apurva > > > > > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta > > wrote: > > > > > >> Hi Frank, > > >> > > >> What is is the value of `batch.size` in your producer? What is the > size > > >> of the key and value you are trying to write? > > >> > > >> Thanks, > > >> Apurva > > >> > > >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu > > wrote: > > >> > > >>> Hey people, I see an error I haven't seen before. It is on a > > lowlevel-API > > >>> based streams application. I've started it once, then it ran fine, > then > > >>> did > > >>> a graceful shutdown and since then I always see this error on > startup. > > >>> > > >>> I'm using yesterday's trunk. > > >>> > > >>> It seems that the MemoryRecordsBuilder overflows somehow, is there > > >>> something I need to configure? > > >>> > > >>> java.lang.NullPointerException > > >>> > > >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > > >>> at > > >>> org.apache.kafka.clients.producer.internals.RecordAccumulato > > >>> r.append(RecordAccumulator.java:219) > > >>> at > > >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka > > >>> Producer.java:650) > > >>> at > > >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr > > >>> oducer.java:604) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.RecordCollector > > >>> Impl.send(RecordCollectorImpl.java:97) > > >>> at > > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l > > >>> ogChange(StoreChangeLogger.java:59) > > >>> at > > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > > >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) > > >>> at > > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > > >>> ueStore.put(ChangeLoggingKeyValueStore.java:73) > > >>> at > > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > > >>> e$2.run(MeteredKeyValueStore.java:66) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > > >>> e.put(MeteredKeyValueStore.java:149) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:47) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex >
Re: NPE on startup with a low-level API based application
Yes, compression was on (lz4), key and value sizes fluctuate, key sizes are small <10 bytes, value sizes fluctuate also, but nothing crazy, up to about 100kb. I did some stepping through the code and at some point I saw a branch that used a different path depending on protocol version (something with LegacyRecord), then I figured updating the broker was worth a shot. I can do some more testing, but I need to set up another 0.10.2.1 cluster first. Frank On Fri, Jun 16, 2017 at 2:09 AM, Apurva Mehta wrote: > Finally, was compression enabled when you hit this exception? If so, which > compression algorithm was enabled? > > On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta wrote: > > > Frank: it would be even better if you could share the key and value which > > was causing this problem. Maybe share it on the JIRA: > > https://issues.apache.org/jira/browse/KAFKA-5456 ? > > > > Thanks, > > Apurva > > > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta > wrote: > > > >> Hi Frank, > >> > >> What is is the value of `batch.size` in your producer? What is the size > >> of the key and value you are trying to write? > >> > >> Thanks, > >> Apurva > >> > >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu > wrote: > >> > >>> Hey people, I see an error I haven't seen before. It is on a > lowlevel-API > >>> based streams application. I've started it once, then it ran fine, then > >>> did > >>> a graceful shutdown and since then I always see this error on startup. > >>> > >>> I'm using yesterday's trunk. > >>> > >>> It seems that the MemoryRecordsBuilder overflows somehow, is there > >>> something I need to configure? > >>> > >>> java.lang.NullPointerException > >>> > >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > >>> at > >>> org.apache.kafka.clients.producer.internals.RecordAccumulato > >>> r.append(RecordAccumulator.java:219) > >>> at > >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka > >>> Producer.java:650) > >>> at > >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr > >>> oducer.java:604) > >>> at > >>> org.apache.kafka.streams.processor.internals.RecordCollector > >>> Impl.send(RecordCollectorImpl.java:97) > >>> at > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l > >>> ogChange(StoreChangeLogger.java:59) > >>> at > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) > >>> at > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > >>> ueStore.put(ChangeLoggingKeyValueStore.java:73) > >>> at > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >>> e$2.run(MeteredKeyValueStore.java:66) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >>> e.put(MeteredKeyValueStore.java:149) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:47) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.
Re: NPE on startup with a low-level API based application
Finally, was compression enabled when you hit this exception? If so, which compression algorithm was enabled? On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta wrote: > Frank: it would be even better if you could share the key and value which > was causing this problem. Maybe share it on the JIRA: > https://issues.apache.org/jira/browse/KAFKA-5456 ? > > Thanks, > Apurva > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta wrote: > >> Hi Frank, >> >> What is is the value of `batch.size` in your producer? What is the size >> of the key and value you are trying to write? >> >> Thanks, >> Apurva >> >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu wrote: >> >>> Hey people, I see an error I haven't seen before. It is on a lowlevel-API >>> based streams application. I've started it once, then it ran fine, then >>> did >>> a graceful shutdown and since then I always see this error on startup. >>> >>> I'm using yesterday's trunk. >>> >>> It seems that the MemoryRecordsBuilder overflows somehow, is there >>> something I need to configure? >>> >>> java.lang.NullPointerException >>> >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) >>> at >>> org.apache.kafka.clients.producer.internals.RecordAccumulato >>> r.append(RecordAccumulator.java:219) >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka >>> Producer.java:650) >>> at >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr >>> oducer.java:604) >>> at >>> org.apache.kafka.streams.processor.internals.RecordCollector >>> Impl.send(RecordCollectorImpl.java:97) >>> at >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l >>> ogChange(StoreChangeLogger.java:59) >>> at >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) >>> at >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal >>> ueStore.put(ChangeLoggingKeyValueStore.java:73) >>> at >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor >>> e$2.run(MeteredKeyValueStore.java:66) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>> at >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor >>> e.put(MeteredKeyValueStore.java:149) >>> at >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S >>> toreProcessor.java:47) >>> at >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S >>> toreProcessor.java:1) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 >>> .run(ProcessorNode.java:47) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p >>> rocess(ProcessorNode.java:133) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorContex >>> tImpl.forward(ProcessorContextImpl.java:82) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.process(OneToManyGroupedProcessor.java:80) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.process(OneToManyGroupedProcessor.java:1) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 >>> .run(ProcessorNode.java:47) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p >>> rocess(ProcessorNode.java:133) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorContex >>> tImpl.forward(ProcessorContextImpl.java:82) >>> at >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S >>> toreProcessor.java:48) >>> at >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S >>> toreProcessor.java:1) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 >>> .run(ProcessorNode.java:47) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p >>> rocess(ProcessorNode.java:133) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorContex >>> tImpl.forward(ProcessorContextImpl.java:82) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.process(OneToManyGroupedProcessor.java:80) >>> at >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >>> rocessor.process(OneToManyGroupedProcessor.java:1) >>> at >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 >>> .run(ProcessorNode.java:47) >>> at >>> org.apache.kafka.streams.processor.internals.StreamsM
Re: NPE on startup with a low-level API based application
Frank: it would be even better if you could share the key and value which was causing this problem. Maybe share it on the JIRA: https://issues.apache.org/jira/browse/KAFKA-5456 ? Thanks, Apurva On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta wrote: > Hi Frank, > > What is is the value of `batch.size` in your producer? What is the size of > the key and value you are trying to write? > > Thanks, > Apurva > > On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu wrote: > >> Hey people, I see an error I haven't seen before. It is on a lowlevel-API >> based streams application. I've started it once, then it ran fine, then >> did >> a graceful shutdown and since then I always see this error on startup. >> >> I'm using yesterday's trunk. >> >> It seems that the MemoryRecordsBuilder overflows somehow, is there >> something I need to configure? >> >> java.lang.NullPointerException >> >> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) >> at >> org.apache.kafka.clients.producer.internals.RecordAccumulato >> r.append(RecordAccumulator.java:219) >> at >> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka >> Producer.java:650) >> at >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr >> oducer.java:604) >> at >> org.apache.kafka.streams.processor.internals.RecordCollector >> Impl.send(RecordCollectorImpl.java:97) >> at >> org.apache.kafka.streams.state.internals.StoreChangeLogger. >> logChange(StoreChangeLogger.java:59) >> at >> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal >> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) >> at >> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal >> ueStore.put(ChangeLoggingKeyValueStore.java:73) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> e$2.run(MeteredKeyValueStore.java:66) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> e.put(MeteredKeyValueStore.java:149) >> at >> com.dexels.kafka.streams.remotejoin.StoreProcessor.process( >> StoreProcessor.java:47) >> at >> com.dexels.kafka.streams.remotejoin.StoreProcessor.process( >> StoreProcessor.java:1) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$ >> 1.run(ProcessorNode.java:47) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode. >> process(ProcessorNode.java:133) >> at >> org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.forward(ProcessorContextImpl.java:82) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.process(OneToManyGroupedProcessor.java:80) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.process(OneToManyGroupedProcessor.java:1) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$ >> 1.run(ProcessorNode.java:47) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode. >> process(ProcessorNode.java:133) >> at >> org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.forward(ProcessorContextImpl.java:82) >> at >> com.dexels.kafka.streams.remotejoin.StoreProcessor.process( >> StoreProcessor.java:48) >> at >> com.dexels.kafka.streams.remotejoin.StoreProcessor.process( >> StoreProcessor.java:1) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$ >> 1.run(ProcessorNode.java:47) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode. >> process(ProcessorNode.java:133) >> at >> org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.forward(ProcessorContextImpl.java:82) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.process(OneToManyGroupedProcessor.java:80) >> at >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP >> rocessor.process(OneToManyGroupedProcessor.java:1) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$ >> 1.run(ProcessorNode.java:47) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode. >> process(ProcessorNode.java:133) >> at >> org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.forward(ProcessorContextImpl.java:82) >> at >> com.dexels.kaf
Re: NPE on startup with a low-level API based application
Hi Frank, What is is the value of `batch.size` in your producer? What is the size of the key and value you are trying to write? Thanks, Apurva On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu wrote: > Hey people, I see an error I haven't seen before. It is on a lowlevel-API > based streams application. I've started it once, then it ran fine, then did > a graceful shutdown and since then I always see this error on startup. > > I'm using yesterday's trunk. > > It seems that the MemoryRecordsBuilder overflows somehow, is there > something I need to configure? > > java.lang.NullPointerException > > at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.append( > RecordAccumulator.java:219) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend( > KafkaProducer.java:650) > at > org.apache.kafka.clients.producer.KafkaProducer.send( > KafkaProducer.java:604) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:97) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put( > ChangeLoggingKeyValueStore.java:73) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run( > MeteredKeyValueStore.java:66) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:149) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:47) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:82) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > emitMessage(OneToManyGroupedProcessor.java:95) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:80) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:82) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:48) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:82) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > emitMessage(OneToManyGroupedProcessor.java:95) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:80) > at > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward( > ProcessorContextImpl.java:82) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:48) > at > com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org
Re: NPE on startup with a low-level API based application
It seems to happen when using Streams 0.11.1 snapshot against a 0.10.2 (release) broker, the problem disappeared after I upgraded the broker. On Thu, Jun 15, 2017 at 11:28 AM, Frank Lyaruu wrote: > Hey people, I see an error I haven't seen before. It is on a lowlevel-API > based streams application. I've started it once, then it ran fine, then did > a graceful shutdown and since then I always see this error on startup. > > I'm using yesterday's trunk. > > It seems that the MemoryRecordsBuilder overflows somehow, is there > something I need to configure? > > java.lang.NullPointerException > > at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > at org.apache.kafka.clients.producer.internals.RecordAccumulator.append( > RecordAccumulator.java:219) > at org.apache.kafka.clients.producer.KafkaProducer.doSend( > KafkaProducer.java:650) > at org.apache.kafka.clients.producer.KafkaProducer.send( > KafkaProducer.java:604) > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:97) > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore. > put(ChangeLoggingKeyValueStore.java:73) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run( > MeteredKeyValueStore.java:66) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:149) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:47) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > emitMessage(OneToManyGroupedProcessor.java:95) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:80) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:1) > at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:48) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > emitMessage(OneToManyGroupedProcessor.java:95) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:80) > at com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > process(OneToManyGroupedProcessor.java:1) > at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:133) > at org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:82) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:48) > at com.dexels.kafka.streams.remotejoin.StoreProcessor. > process(StoreProcessor.java:1) > at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:47) > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.ja