(am attaching a debug log - note that app terminated with no further
messages)

topology: kStream -> groupByKey.aggregate(minute) -> foreach
                             \-> groupByKey.aggregate(hour) -> foreach


config:

        Properties config = new Properties();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
        config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
AggKey.class.getName());
        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
        config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");

        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");


On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Jon,
>
> To help investigating this issue, could you let me know 1) your topology
> sketch and 2) your app configs? For example did you enable caching in your
> apps with the cache.max.bytes.buffering config?
>
>
> Guozhang
>
>
> On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > I get this one quite a bit. It kills my app after a short time of
> running.
> > Driving me nuts.
> >
> > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Not sure about this one.
> > >
> > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > definitely want to investigate this.
> > >
> > > -Matthias
> > >
> > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > >
> > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > MinuteAgg failed on partition assignment
> > > >
> > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > change while restoring
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > restoreActiveState(ProcessorStateManager.java:245)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > register(ProcessorStateManager.java:198)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > RocksDBWindowStore.java:206)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > MeteredWindowStore.java:66)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > CachingWindowStore.java:64)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > initializeStateStores(AbstractTask.java:81)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.<init>(StreamTask.java:120)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:633)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$100(
> > > StreamThread.java:69)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:124)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:228)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:259)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > >
> > > >         at
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:407)
> > > >
> > > >         at
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:242)
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Attachment: errors.log.gz
Description: GNU Zip compressed data

Reply via email to