The quickest answer I can give you is trying a similar example [1], where
we provide a driver that generates the required input data for the page
view example.

-Michael



[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java
(this is for Confluent 3.2 and Apache Kafka 0.10.2.0)



On Thu, Mar 23, 2017 at 11:56 AM, Shanthi Nellaiappan <shan2n...@gmail.com>
wrote:

> Any example for the above would be appreciated. Thanks
>
> On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <shan2n...@gmail.com>
> wrote:
>
> > Thanks for the info.
> > With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as
> input
> > for streams-pageview-input   an "2",{"region":"CA","timestamp"
> :1435278177777}
> > as input for  streams-userprofile-input, the following error is shown,
> > Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> > JsonTimestampExtractor cannot recognize the record value
> > org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$
> > PageViewByRegion@4764b2e
> > at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.
> > extract(JsonTimestampExtractor.java:43)
> > at org.apache.kafka.streams.processor.internals.
> RecordQueue.addRawRecords(
> > RecordQueue.java:105)
> > at org.apache.kafka.streams.processor.internals.
> > PartitionGroup.addRawRecords(PartitionGroup.java:117)
> > at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> > StreamTask.java:144)
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:415)
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:242)
> >
> > Any example on the correct input value is really appreciated.
> >
> > Thanks
> >
> > On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> >> IIRC the PageViewTypedDemo example requires input data where the
> >> username/userId is captured in the keys of messages/records, and further
> >> information in the values of those messages.
> >>
> >> The problem you are running into is that, when you are writing your
> input
> >> data via the console consumer, the records you are generating only have
> >> values -- the keys are null because you don't specify any explicitly.
> >>
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >>
> >> And you have the same issue for the other topic,
> "streams-pageview-input".
> >>
> >> To enter keys, you need to add some CLI options to the console producer.
> >>
> >> Example:
> >>
> >>     $ bin/kafka-console-producer --broker-list localhost:9092 \
> >>                              --topic streams-userprofile-input \
> >>                              --property parse.key=true \
> >>                              --property key.separator=,
> >>
> >>     firstUser,firstValue
> >>     secondUser,secondValue
> >>
> >> Hope this helps,
> >> Michael
> >>
> >>
> >>
> >>
> >> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <
> shan2n...@gmail.com
> >> >
> >> wrote:
> >>
> >> > I have started exploring kafka streaming API. I'm trying to  run
> >> > PageViewTypedDemo program as it is without any changes locally on a
> >> > desktop. Current kafka version is 0.10.1.0.
> >> >
> >> > With the following inputs from 2 different console,
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-pageview-input
> >> >
> >> > {"user":"1", "page":"22", "timestamp":1435278171111}
> >> >
> >> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> >> > streams-userprofile-input
> >> >
> >> > {"region":"CA", "timestamp":1435278171139}
> >> >
> >> > The error is
> >> >
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
> >> > topic=streams-userprofile-input, partition=0, offset=0
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamTask.process(StreamTask.java:200)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> > StreamThread.java:436)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamThread.run(StreamThread.java:242)
> >> >
> >> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record
> key
> >> > for
> >> > the source KTable from store name streams-userprofile-store-name
> should
> >> not
> >> > be null.
> >> >
> >> > at
> >> > org.apache.kafka.streams.kstream.internals.KTableSource$
> >> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> > ProcessorNode.java:82)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.ProcessorContex
> >> tImpl.forward(
> >> > ProcessorContextImpl.java:204)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > SourceNode.process(SourceNode.java:66)
> >> >
> >> > at
> >> > org.apache.kafka.streams.processor.internals.
> >> > StreamTask.process(StreamTask.java:181)
> >> >
> >> > ... 2 more
> >> >
> >> >
> >> > Can someone help .Is there anything else to be done apart from
> creating
> >> the
> >> > 2 topics streams-pageview-input & streams-userprofile-input
> >> >
> >>
> >
> >
>

Reply via email to