Any example for the above would be appreciated. Thanks On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <shan2n...@gmail.com> wrote:
> Thanks for the info. > With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input > for streams-pageview-input an "2",{"region":"CA","timestamp":1435278177777} > as input for streams-userprofile-input, the following error is shown, > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: > JsonTimestampExtractor cannot recognize the record value > org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$ > PageViewByRegion@4764b2e > at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor. > extract(JsonTimestampExtractor.java:43) > at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords( > RecordQueue.java:105) > at org.apache.kafka.streams.processor.internals. > PartitionGroup.addRawRecords(PartitionGroup.java:117) > at org.apache.kafka.streams.processor.internals.StreamTask.addRecords( > StreamTask.java:144) > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:415) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > > Any example on the correct input value is really appreciated. > > Thanks > > On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io> > wrote: > >> IIRC the PageViewTypedDemo example requires input data where the >> username/userId is captured in the keys of messages/records, and further >> information in the values of those messages. >> >> The problem you are running into is that, when you are writing your input >> data via the console consumer, the records you are generating only have >> values -- the keys are null because you don't specify any explicitly. >> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> streams-userprofile-input >> > >> > {"region":"CA", "timestamp":1435278171139} >> >> And you have the same issue for the other topic, "streams-pageview-input". >> >> To enter keys, you need to add some CLI options to the console producer. >> >> Example: >> >> $ bin/kafka-console-producer --broker-list localhost:9092 \ >> --topic streams-userprofile-input \ >> --property parse.key=true \ >> --property key.separator=, >> >> firstUser,firstValue >> secondUser,secondValue >> >> Hope this helps, >> Michael >> >> >> >> >> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com >> > >> wrote: >> >> > I have started exploring kafka streaming API. I'm trying to run >> > PageViewTypedDemo program as it is without any changes locally on a >> > desktop. Current kafka version is 0.10.1.0. >> > >> > With the following inputs from 2 different console, >> > >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> > streams-pageview-input >> > >> > {"user":"1", "page":"22", "timestamp":1435278171111} >> > >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> > streams-userprofile-input >> > >> > {"region":"CA", "timestamp":1435278171139} >> > >> > The error is >> > >> > Exception in thread "StreamThread-1" >> > org.apache.kafka.streams.errors.StreamsException: Exception caught in >> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, >> > topic=streams-userprofile-input, partition=0, offset=0 >> > >> > at >> > org.apache.kafka.streams.processor.internals. >> > StreamTask.process(StreamTask.java:200) >> > >> > at >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > StreamThread.java:436) >> > >> > at >> > org.apache.kafka.streams.processor.internals. >> > StreamThread.run(StreamThread.java:242) >> > >> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key >> > for >> > the source KTable from store name streams-userprofile-store-name should >> not >> > be null. >> > >> > at >> > org.apache.kafka.streams.kstream.internals.KTableSource$ >> > MaterializedKTableSourceProcessor.process(KTableSource.java:83) >> > >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > ProcessorNode.java:82) >> > >> > at >> > org.apache.kafka.streams.processor.internals.ProcessorContex >> tImpl.forward( >> > ProcessorContextImpl.java:204) >> > >> > at >> > org.apache.kafka.streams.processor.internals. >> > SourceNode.process(SourceNode.java:66) >> > >> > at >> > org.apache.kafka.streams.processor.internals. >> > StreamTask.process(StreamTask.java:181) >> > >> > ... 2 more >> > >> > >> > Can someone help .Is there anything else to be done apart from creating >> the >> > 2 topics streams-pageview-input & streams-userprofile-input >> > >> > >