Any example for the above would be appreciated. Thanks

On Wed, Mar 22, 2017 at 2:50 PM, Shanthi Nellaiappan <shan2n...@gmail.com>
wrote:

> Thanks for the info.
> With "page2",{"user":"2", "page":"22", "timestamp":1435278177777} as input
> for streams-pageview-input   an "2",{"region":"CA","timestamp":1435278177777}
> as input for  streams-userprofile-input, the following error is shown,
> Exception in thread "StreamThread-1" java.lang.IllegalArgumentException:
> JsonTimestampExtractor cannot recognize the record value
> org.apache.kafka.streams.examples.pageview.PageViewTypedDemo$
> PageViewByRegion@4764b2e
> at org.apache.kafka.streams.examples.pageview.JsonTimestampExtractor.
> extract(JsonTimestampExtractor.java:43)
> at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:105)
> at org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:117)
> at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> StreamTask.java:144)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:415)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
>
> Any example on the correct input value is really appreciated.
>
> Thanks
>
> On Wed, Mar 22, 2017 at 12:27 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
>> IIRC the PageViewTypedDemo example requires input data where the
>> username/userId is captured in the keys of messages/records, and further
>> information in the values of those messages.
>>
>> The problem you are running into is that, when you are writing your input
>> data via the console consumer, the records you are generating only have
>> values -- the keys are null because you don't specify any explicitly.
>>
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> streams-userprofile-input
>> >
>> > {"region":"CA", "timestamp":1435278171139}
>>
>> And you have the same issue for the other topic, "streams-pageview-input".
>>
>> To enter keys, you need to add some CLI options to the console producer.
>>
>> Example:
>>
>>     $ bin/kafka-console-producer --broker-list localhost:9092 \
>>                              --topic streams-userprofile-input \
>>                              --property parse.key=true \
>>                              --property key.separator=,
>>
>>     firstUser,firstValue
>>     secondUser,secondValue
>>
>> Hope this helps,
>> Michael
>>
>>
>>
>>
>> On Wed, Mar 22, 2017 at 8:10 PM, Shanthi Nellaiappan <shan2n...@gmail.com
>> >
>> wrote:
>>
>> > I have started exploring kafka streaming API. I'm trying to  run
>> > PageViewTypedDemo program as it is without any changes locally on a
>> > desktop. Current kafka version is 0.10.1.0.
>> >
>> > With the following inputs from 2 different console,
>> >
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> > streams-pageview-input
>> >
>> > {"user":"1", "page":"22", "timestamp":1435278171111}
>> >
>> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>> > streams-userprofile-input
>> >
>> > {"region":"CA", "timestamp":1435278171139}
>> >
>> > The error is
>> >
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
>> > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001,
>> > topic=streams-userprofile-input, partition=0, offset=0
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:200)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > StreamThread.java:436)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamThread.run(StreamThread.java:242)
>> >
>> > Caused by: org.apache.kafka.streams.errors.StreamsException: Record key
>> > for
>> > the source KTable from store name streams-userprofile-store-name should
>> not
>> > be null.
>> >
>> > at
>> > org.apache.kafka.streams.kstream.internals.KTableSource$
>> > MaterializedKTableSourceProcessor.process(KTableSource.java:83)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > ProcessorNode.java:82)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.ProcessorContex
>> tImpl.forward(
>> > ProcessorContextImpl.java:204)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > SourceNode.process(SourceNode.java:66)
>> >
>> > at
>> > org.apache.kafka.streams.processor.internals.
>> > StreamTask.process(StreamTask.java:181)
>> >
>> > ... 2 more
>> >
>> >
>> > Can someone help .Is there anything else to be done apart from creating
>> the
>> > 2 topics streams-pageview-input & streams-userprofile-input
>> >
>>
>
>

Reply via email to