No. It does not support hidden topics.

The only explanation might be, that there is no repartitioning step. But
than the question would be, if there is a bug in Kafka Streams, because
between map() and countByKey() repartitioning is required.

Can you verify that the result is correct?

-Matthias

On 08/30/2016 03:24 PM, Tommy Q wrote:
> Does Kafka support hidden topics ? (Since all the topics infos are stored
> in ZK, this probably not the case )
> 
> On Tue, Aug 30, 2016 at 5:58 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi Tommy,
>>
>> yes, you do understand Kafka Streams correctly. And yes, for shuffling,
>> na internal topic will be created under the hood. It should be named
>> "<application-id>-something-repartition". I am not sure, why it is not
>> listed via bin/kafka-topics.sh
>>
>> The internal topic "<application-id>-counts-changelog" you see is
>> created to back the state of countByKey() operator.
>>
>> See
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams%3A+Internal+Data+Management
>>
>> and
>>
>> http://www.confluent.io/blog/data-reprocessing-with-kafka-
>> streams-resetting-a-streams-application
>>
>>
>> -Matthias
>>
>>
>> On 08/30/2016 06:55 AM, Tommy Q wrote:
>>> Michael, Thanks for your help.
>>>
>>> Take the word count example, I am trying to walk through the code based
>> on
>>> your explanation:
>>>
>>>     val textLines: KStream[String, String] =
>> builder.stream("input-topic")
>>>     val wordCounts: KStream[String, JLong] = textLines
>>>       .flatMapValues(_.toLowerCase.split("\\W+").toIterable.asJava)
>>>       .map((key: String, word: String) => new KeyValue(word, word))
>>>       .countByKey("counts")
>>>       .toStream
>>>
>>>     wordCounts.to(stringSerde, longSerde, "wc-out")
>>>
>>> Suppose the input-topic has two partitions and each partition has a
>> string
>>> record produced into:
>>>
>>> input-topic_0 : "a b"
>>>> input-topic_1 : "a b c"
>>>
>>>
>>> Suppose we started two instance of the stream topology ( task_0 and
>>> task_1). So after flatMapValues & map executed, they should have the
>>> following task state:
>>>
>>> task_0 :  [ (a, "a"), (b, "b") ]
>>>> task_1 :  [ (a, "a"), (b: "b"),  (c: "c") ]
>>>
>>>
>>> Before the execution of  countByKey, the kafka-stream framework should
>>> insert a invisible shuffle phase internally:
>>>
>>> shuffled across the network :
>>>>
>>>
>>>
>>>> _internal_topic_shuffle_0 :  [ (a, "a"), (a, "a") ]
>>>> _internal_topic_shuffle_1 :  [ (b, "b"), (b: "b"),  (c: "c") ]
>>>
>>>
>>> countByKey (reduce) :
>>>
>>> task_0 (counts-changelog_0) :  [ (a, 2) ]
>>>
>>> task_1 (counts-changelog_1):   [ (b, 2), (c, 1) ]
>>>
>>>
>>> And after the execution of `wordCounts.to(stringSerde, longSerde,
>>> "wc-out")`, we get the word count output in wc-out topic:
>>>
>>> task_0 (wc-out_0) :  [ (a, 2) ]
>>>
>>> task_1 (wc-out_1):   [ (b, 2), (c, 1) ]
>>>
>>>
>>>
>>> According the steps list above, do I understand the internals of kstream
>>> word count correctly ?
>>> Another question is does the shuffle across the network work by creating
>>> intermediate topics ? If so, why can't I find the intermediate topics
>> using
>>> `bin/kafka-topics.sh --list --zookeeper localhost:2181` ?  I can only see
>>> the counts-changelog got created by the kstream framework.
>>>
>>>
>>>
>>> On Tue, Aug 30, 2016 at 2:25 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>>>
>>>> In Kafka Streams, data is partitioned according to the keys of the
>>>> key-value records, and operations such as countByKey operate on these
>>>> stream partitions.  When reading data from Kafka, these stream
>> partitions
>>>> map to the partitions of the Kafka input topic(s), but these may change
>>>> once you add processing operations.
>>>>
>>>> To your question:  The first step, if the data isn't already keyed as
>>>> needed, is to select the key you want to count by, which results in 1+
>>>> output stream partitions.  Here, data may get shuffled across the
>> network
>>>> (but if won't if there's no need to, e.g. when the data is already
>> keyed as
>>>> needed).  Then the count operation is performed for each stream
>> partition,
>>>> which is similar to the sort-and-reduce phase in Hadoop.
>>>>
>>>> On Mon, Aug 29, 2016 at 5:31 PM, Tommy <deeplam...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> For "word count" example in Hadoop, there are shuffle-sort-and-reduce
>>>>> phases that handles outputs from different mappers, how does it work in
>>>>> KStream ?
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to