The quickest answer I can give you is trying a similar example [1], where we provide a driver that generates the required input data for the page view example.
-Michael [1] https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java (this is for Confluent 3.2 and Apache Kafka 0.10.2.0) On Thu, Mar 23, 2017 at 11:56 AM, Shanthi Nellaiappan <shan2n...@gmail.com> wrote: > Any example for the above would be appreciated. Thanks > > On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <shan2n...@gmail.com> > wrote: > > > Thanks for the info. > > With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as > input > > for streams-pageview-input an "2",{"region":"CA","timestamp" > :1435278177777} > > as input for streams-userprofile-input, the following error is shown, > > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: > > JsonTimestampExtractor cannot recognize the record value > > org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$ > > PageViewByRegion@4764b2e > > at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor. > > extract(JsonTimestampExtractor.java:43) > > at org.apache.kafka.streams.processor.internals. > RecordQueue.addRawRecords( > > RecordQueue.java:105) > > at org.apache.kafka.streams.processor.internals. > > PartitionGroup.addRawRecords(PartitionGroup.java:117) > > at org.apache.kafka.streams.processor.internals.StreamTask.addRecords( > > StreamTask.java:144) > > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:415) > > at org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > > > Any example on the correct input value is really appreciated. > > > > Thanks > > > > On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io> > > wrote: > > > >> IIRC the PageViewTypedDemo example requires input data where the > >> username/userId is captured in the keys of messages/records, and further > >> information in the values of those messages. > >> > >> The problem you are running into is that, when you are writing your > input > >> data via the console consumer, the records you are generating only have > >> values -- the keys are null because you don't specify any explicitly. > >> > >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > >> streams-userprofile-input > >> > > >> > {"region":"CA", "timestamp":1435278171139} > >> > >> And you have the same issue for the other topic, > "streams-pageview-input". > >> > >> To enter keys, you need to add some CLI options to the console producer. > >> > >> Example: > >> > >> $ bin/kafka-console-producer --broker-list localhost:9092 \ > >> --topic streams-userprofile-input \ > >> --property parse.key=true \ > >> --property key.separator=, > >> > >> firstUser,firstValue > >> secondUser,secondValue > >> > >> Hope this helps, > >> Michael > >> > >> > >> > >> > >> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan < > shan2n...@gmail.com > >> > > >> wrote: > >> > >> > I have started exploring kafka streaming API. I'm trying to run > >> > PageViewTypedDemo program as it is without any changes locally on a > >> > desktop. Current kafka version is 0.10.1.0. > >> > > >> > With the following inputs from 2 different console, > >> > > >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > >> > streams-pageview-input > >> > > >> > {"user":"1", "page":"22", "timestamp":1435278171111} > >> > > >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > >> > streams-userprofile-input > >> > > >> > {"region":"CA", "timestamp":1435278171139} > >> > > >> > The error is > >> > > >> > Exception in thread "StreamThread-1" > >> > org.apache.kafka.streams.errors.StreamsException: Exception caught in > >> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, > >> > topic=streams-userprofile-input, partition=0, offset=0 > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals. > >> > StreamTask.process(StreamTask.java:200) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> > StreamThread.java:436) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals. > >> > StreamThread.run(StreamThread.java:242) > >> > > >> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record > key > >> > for > >> > the source KTable from store name streams-userprofile-store-name > should > >> not > >> > be null. > >> > > >> > at > >> > org.apache.kafka.streams.kstream.internals.KTableSource$ > >> > MaterializedKTableSourceProcessor.process(KTableSource.java:83) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > >> > ProcessorNode.java:82) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals.ProcessorContex > >> tImpl.forward( > >> > ProcessorContextImpl.java:204) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals. > >> > SourceNode.process(SourceNode.java:66) > >> > > >> > at > >> > org.apache.kafka.streams.processor.internals. > >> > StreamTask.process(StreamTask.java:181) > >> > > >> > ... 2 more > >> > > >> > > >> > Can someone help .Is there anything else to be done apart from > creating > >> the > >> > 2 topics streams-pageview-input & streams-userprofile-input > >> > > >> > > > > >