Hi Tommy,

I did checkout your github project and can verify the "issue". As you
are using Kafka 0.10.0.1 the automatic repartitioning step is not available.

If you use "trunk" version, your program will run as expected. If you
want to stay with 0.10.0.1, you need to repartition the data after map()
explicitly, via a call to through():

> val wordCounts: KStream[String, JLong] = textLines
>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>       .map((key: String, word: String) => new KeyValue(word, word))
>       .through("my-repartitioing-topic")
>       .countByKey("counts")
>       .toStream

Keep in mind, that it is recommended to create all user topics manually.
Thus, you should create your repartitioning topic you specify in
through() before you start your Kafka Streams application.


-Matthias


On 08/31/2016 09:07 PM, Guozhang Wang wrote:
> Hello Tommy,
> 
> Which version of Kafka are you using?
> 
> Guozhang
> 
> On Wed, Aug 31, 2016 at 4:41 AM, Tommy Q <deeplam...@gmail.com> wrote:
> 
>> I cleaned up all the zookeeper & kafka states and run the WordCountDemo
>> again, the results in wc-out is still wrong:
>>
>> a 1
>>> b 1
>>> a 1
>>> b 1
>>> c 1
>>
>>
>>
>> On Wed, Aug 31, 2016 at 5:32 PM, Michael Noll <mich...@confluent.io>
>> wrote:
>>
>>> Can you double-check whether the results in wc-out are not rather:
>>>
>>> a 1
>>> b 1
>>> a 2
>>> b 2
>>> c 1
>>>
>>> ?
>>>
>>> On Wed, Aug 31, 2016 at 5:47 AM, Tommy Q <deeplam...@gmail.com> wrote:
>>>
>>>> Tried the word count example as discussed, the result in wc-out is
>> wrong:
>>>>
>>>> a 1
>>>>> b 1
>>>>> a 1
>>>>> b 1
>>>>> c 1
>>>>
>>>>
>>>> The expected result should be:
>>>>
>>>> a 2
>>>>> b 2
>>>>> c 1
>>>>
>>>>
>>>> Kafka version is 0.10.0.1
>>>>
>>>>
>>>> On Tue, Aug 30, 2016 at 10:29 PM, Matthias J. Sax <
>> matth...@confluent.io
>>>>
>>>> wrote:
>>>>
>>>>> No. It does not support hidden topics.
>>>>>
>>>>> The only explanation might be, that there is no repartitioning step.
>>> But
>>>>> than the question would be, if there is a bug in Kafka Streams,
>> because
>>>>> between map() and countByKey() repartitioning is required.
>>>>>
>>>>> Can you verify that the result is correct?
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 08/30/2016 03:24 PM, Tommy Q wrote:
>>>>>> Does Kafka support hidden topics ? (Since all the topics infos are
>>>> stored
>>>>>> in ZK, this probably not the case )
>>>>>>
>>>>>> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tommy,
>>>>>>>
>>>>>>> yes, you do understand Kafka Streams correctly. And yes, for
>>>> shuffling,
>>>>>>> na internal topic will be created under the hood. It should be
>> named
>>>>>>> "<application-id>-something-repartition". I am not sure, why it
>> is
>>>> not
>>>>>>> listed via bin/kafka-topics.sh
>>>>>>>
>>>>>>> The internal topic "<application-id>-counts-changelog" you see is
>>>>>>> created to back the state of countByKey() operator.
>>>>>>>
>>>>>>> See
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/
>>>>>>> Kafka+Streams%3A+Internal+Data+Management
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>>>>>>> streams-resetting-a-streams-application
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>> On 08/30/2016 06:55 AM, Tommy Q wrote:
>>>>>>>> Michael, Thanks for your help.
>>>>>>>>
>>>>>>>> Take the word count example, I am trying to walk through the code
>>>> based
>>>>>>> on
>>>>>>>> your explanation:
>>>>>>>>
>>>>>>>>     val textLines: KStream[String, String] =
>>>>>>> builder.stream("input-topic")
>>>>>>>>     val wordCounts: KStream[String, JLong] = textLines
>>>>>>>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.
>>> asJava)
>>>>>>>>       .map((key: String, word: String) => new KeyValue(word,
>> word))
>>>>>>>>       .countByKey("counts")
>>>>>>>>       .toStream
>>>>>>>>
>>>>>>>>     wordCounts.to(stringSerde, longSerde, "wc-out")
>>>>>>>>
>>>>>>>> Suppose the input-topic has two partitions and each partition
>> has a
>>>>>>> string
>>>>>>>> record produced into:
>>>>>>>>
>>>>>>>> input-topic_0 : "a b"
>>>>>>>>> input-topic_1 : "a b c"
>>>>>>>>
>>>>>>>>
>>>>>>>> Suppose we started two instance of the stream topology ( task_0
>> and
>>>>>>>> task_1). So after flatMapValues & map executed, they should have
>>> the
>>>>>>>> following task state:
>>>>>>>>
>>>>>>>> task_0 :  [ (a, "a"), (b, "b") ]
>>>>>>>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
>>>>>>>>
>>>>>>>>
>>>>>>>> Before the execution of  countByKey, the kafka-stream framework
>>>> should
>>>>>>>> insert a invisible shuffle phase internally:
>>>>>>>>
>>>>>>>> shuffled across the network :
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
>>>>>>>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
>>>>>>>>
>>>>>>>>
>>>>>>>> countByKey (reduce) :
>>>>>>>>
>>>>>>>> task_0 (counts-changelog_0) :  [ (a, 2) ]
>>>>>>>>
>>>>>>>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
>>>>>>>>
>>>>>>>>
>>>>>>>> And after the execution of `wordCounts.to(stringSerde, longSerde,
>>>>>>>> "wc-out")`, we get the word count output in wc-out topic:
>>>>>>>>
>>>>>>>> task_0 (wc-out_0) :  [ (a, 2) ]
>>>>>>>>
>>>>>>>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> According the steps list above, do I understand the internals of
>>>>> kstream
>>>>>>>> word count correctly ?
>>>>>>>> Another question is does the shuffle across the network work by
>>>>> creating
>>>>>>>> intermediate topics ? If so, why can't I find the intermediate
>>> topics
>>>>>>> using
>>>>>>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can
>>> only
>>>>> see
>>>>>>>> the counts-changelog got created by the kstream framework.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <
>>> mich...@confluent.io>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In Kafka Streams, data is partitioned according to the keys of
>> the
>>>>>>>>> key-value records, and operations such as countByKey operate on
>>>> these
>>>>>>>>> stream partitions.  When reading data from Kafka, these stream
>>>>>>> partitions
>>>>>>>>> map to the partitions of the Kafka input topic(s), but these may
>>>>> change
>>>>>>>>> once you add processing operations.
>>>>>>>>>
>>>>>>>>> To your question:  The first step, if the data isn't already
>> keyed
>>>> as
>>>>>>>>> needed, is to select the key you want to count by, which results
>>> in
>>>> 1+
>>>>>>>>> output stream partitions.  Here, data may get shuffled across
>> the
>>>>>>> network
>>>>>>>>> (but if won't if there's no need to, e.g. when the data is
>> already
>>>>>>> keyed as
>>>>>>>>> needed).  Then the count operation is performed for each stream
>>>>>>> partition,
>>>>>>>>> which is similar to the sort-and-reduce phase in Hadoop.
>>>>>>>>>
>>>>>>>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com>
>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> For "word count" example in Hadoop, there are
>>>> shuffle-sort-and-reduce
>>>>>>>>>> phases that handles outputs from different mappers, how does it
>>>> work
>>>>> in
>>>>>>>>>> KStream ?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to