Thanks Unmesh for the detailed explanation.

When you change your topology's stateful operators -- for example, even if
you did not change a non-windowed aggregation to a windowed aggregation,
but just change the aggregate / reduce logic -- the underlying state stores
as well as their corresponding changelogs are not valid anymore. And users
in this case should use a different state store names in their topology
builder.

Currently the state store names are abstracted in the high-level DSLs as
KTable names and window names. But we are working on exposing the names in
the DSL as well like we did in the lower-level processor APIs. Some context
of this issue and a proposed API fix can be found here:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65143671


Guozhang


On Tue, Jun 21, 2016 at 11:58 PM, Unmesh Joshi <[email protected]>
wrote:

> A bit more investigation shows that because logging is always enabled in
> both RocksDBKeyValueStoreSupplier and RocksDBWindowStoreSupplier, the
> aggregated key/values get written to a topic in Kafka. RocksDBWindowStore
> always stores keys with timestamp attached. RocksDBStore stores raw keys.
>
> If the aggregate name remains the same and code is changed to use Windowed
> aggregation, it always expects timestamp attached to the key at startup
> recovery operation.
>
> So if aggregate operation is changed from non window to window in the code,
> the aggregation logs need to be cleared from Kafka. I deleted the logs from
> Kafka nodes and all is working just fine.
>
> Thanks,
> Unmesh
>
> On Wed, Jun 22, 2016 at 9:29 AM, Unmesh Joshi <[email protected]>
> wrote:
>
> > I could reproduce it with following steps.  Adding Stacktrace in the end.
> >
> > 1. Create a stream and consume it without Windowing.
> >
> > KTable<String, Integer> aggregation = locationViews
> >         .map((key, value) -> {
> >             GenericRecord parsedRecord = parse(value);
> >             String parsedKey = parsedRecord.get("region").toString() +
> parsedRecord.get("location").toString();
> >             return new KeyValue<>(parsedKey, 1);
> >         }).reduceByKey((v1, v2) -> v1 + v2, Serdes.String(),
> Serdes.Integer(), "aggregated");
> >
> > aggregation.foreach((k, v) -> System.out.println(k + ", " + v));
> >
> >
> > 2. Stop the consumer and then change the code to add windowing.
> >
> > KTable<Windowed<String>, Integer> aggregation = locationViews
> >         .map((key, value) -> {
> >             GenericRecord parsedRecord = parse(value);
> >             String parsedKey = parsedRecord.get("region").toString() +
> parsedRecord.get("location").toString();
> >             return new KeyValue<>(parsedKey, 1);
> >         }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated",
> 100000), Serdes.String(), Serdes.Integer());
> >
> >
> > aggregation.foreach((k, v) -> System.out.println(k + ", " + v));
> >
> >
> > If I already had a consumer without windowing, it throws following
> exception when consumer is run as in step 2.
> >
> >
> > java.lang.IndexOutOfBoundsException
> > at java.nio.Buffer.checkIndex(Buffer.java:546)
> > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416)
> > at
> >
> org.apache.kafka.streams.state.internals.WindowStoreUtils.timestampFromBinaryKey(WindowStoreUtils.java:63)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(RocksDBWindowStore.java:295)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$100(RocksDBWindowStore.java:44)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$2.restore(RocksDBWindowStore.java:190)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:184)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> > at
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> >
> > On Wed, Jun 22, 2016 at 1:12 AM, Guozhang Wang <[email protected]>
> wrote:
> >
> >> Could you share your stack trace upon failure?
> >>
> >>
> >>
> >> On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <[email protected]>
> >> wrote:
> >>
> >> > HI,
> >> >
> >> > I am using confluent-3.0.0-2.11, with kafka and streams versions
> >> > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
> >> > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem
> >> seems
> >> > to be with null keys because the original messages are not produced
> with
> >> > keys, and I am creating a key value pair in the map function before
> >> > aggregating to KTable. The RocksDBWindowStore putInternal is expecting
> >> the
> >> > timestamp to be appended to the Key, which was not the case.
> >> > It somehow corrected itself, once I started producing messages with
> some
> >> > non null key.
> >> >
> >> > The code is here
> >> >
> >> >
> >>
> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
> >> >
> >> >
> >> > Thanks,
> >> > Unmesh
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <[email protected]>
> >> > wrote:
> >> >
> >> > > Hello Unmesh,
> >> > >
> >> > > Timestamp extractor is always applied at the beginning of the
> topology
> >> > for
> >> > > each incoming record, and the extracted timestamp is carried
> >> throughout
> >> > the
> >> > > topology.
> >> > >
> >> > > Could you share your stack trace upon failure with your source code?
> >> And
> >> > > what version of Kafka Streams are you using? Some old version of the
> >> > > library requires the key (as for your case, "String parsedKey =
> >> > > parsedRecord.get("region").toString() +
> parsedRecord.get("location").
> >> > > toString();") to be not null, but it has been resolved in the recent
> >> > fixes.
> >> > >
> >> > > Guozhang
> >> > >
> >> > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <
> [email protected]>
> >> > > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I was trying to experiment with Kafka streams, and had following
> >> code
> >> > > >
> >> > > > KTable<Windowed<String>, Integer> aggregated = locationViews
> >> > > >         .map((key, value) -> {
> >> > > >             GenericRecord parsedRecord = parse(value);
> >> > > >             String parsedKey =
> >> parsedRecord.get("region").toString() +
> >> > > > parsedRecord.get("location").toString();
> >> > > >             return new KeyValue<>(parsedKey, 1);
> >> > > >         }).reduceByKey((v1, v2) -> v1 + v2,
> >> > > > TimeWindows.of("aggregated", 5000), Serdes.String(),
> >> > > > Serdes.Integer());
> >> > > >
> >> > > > This code fails in  RocksDBWindowStore.putInternal, where its
> trying
> >> > > > to get timestamp from message key.
> >> > > >
> >> > > > I am not producing message with key, so I tried putting
> >> > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> >> > > >
> >> > > >
> >> >
> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> >> > > > WallclockTimestampExtractor.class.getName());
> >> > > >
> >> > > > But RocksDBWindowStore, still expects timestamp in the message key
> >> and
> >> > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> >> > > >
> >> > > > Is it the case that Kafka message should always have a key if we
> >> have
> >> > > > to use Windowing? or this is an issue with RocksDBWindowStore?
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Unmesh
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to