It is correct, that Kafka Streams does not create input/output/through()
topics. You should create those topics upfront.

> We've *set auto.create.topics.enable=true *in our kafka
>> server properties file and we'd expect that the topics get created, but the
>> consumer goes into SHUTDOWN mode and eventually gets removed as a consumer
>> from kafka. Is this normal behavior?

If you set the config to `true`, the broker would auto-create the
corresponding topic. Hence, it's recommended to disable auto topic creation.

> and we'd expect that the topics get created,

Why? If you enable auto topic create, you should expect that the topics
are auto created. Or did you mean "false" instead of "true"?


Kafka Streams would not get any error if input topics are missing. IIRC
correctly, there should be an error if Kafka Streams tries to write into
an non-existing topic though. However, if both input and output topics
are missing, no data would be processed, and thus no write is attempted
and thus Kafka Streams would not shutdown.


-Matthias

On 6/12/19 5:13 PM, Brian Putt wrote:
> After adding more logging details, we found that we hadn't created topics
> yet for the consumer & producer for the streams application. We've added a
> check when starting up to verify that the topics exist, otherwise we exit
> our app. We're not dynamically creating topics and we want to create them
> upfront as we want to specify replication-factor and # of partitions as the
> defaults won't suffice.
> 
> If I understand correctly, streams should've automagically created topics
> that get consumed, but shouldn't create topics for the producer portion of
> the streams lib. We've *set auto.create.topics.enable=true *in our kafka
> server properties file and we'd expect that the topics get created, but the
> consumer goes into SHUTDOWN mode and eventually gets removed as a consumer
> from kafka. Is this normal behavior?
> 
> On Wed, Jun 12, 2019 at 3:44 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Does the application transit to RUNNING state? Also check
>> `KafkaStreams#localThreadsMetadata()` what tasks are assigned?
>>
>> You might also enable DEBUG logs for
>> `org.apache.kafka.clients.consumer.**` classes to see if the consumer
>> sends fetch request to the broker.
>>
>>
>> -Matthias
>>
>> On 6/11/19 7:03 PM, Brian Putt wrote:
>>> The application just hangs (we let it sit for ~1 hour, small dataset as
>>> we're testing), we can restart it listening to 1 of the 3 topics we start
>>> it with and it chugs along, no problem. The same code is executed as
>>> separate application.ids listening to other topics without any issues.
>>> We'll try to increase our logging as nothing is currently being shown in
>>> the logs. Guessing we have our level set to WARN.
>>>
>>> Will certainly share updates as we figure it out.
>>>
>>> Thanks!
>>>
>>> On Tue, Jun 11, 2019 at 6:07 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> What do you exactly observe?
>>>>
>>>>  - Does the application rebalance correctly?
>>>>  - Does it start processing?
>>>>  - Anything in the logs about the status of the application?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 6/10/19 6:19 PM, Brian Putt wrote:
>>>>> Hello,
>>>>>
>>>>> I'm working with the kafka streams api and am running into issues
>> where I
>>>>> subscribe to multiple topics and the consumer just hangs. It has a
>> unique
>>>>> application.id and I can see in kafka that the consumer group has been
>>>>> created, but when I describe the group, I'll get: consumer group X has
>> no
>>>>> active members
>>>>>
>>>>> The interesting thing is that this works when the topics list only
>>>> contains
>>>>> 1 topic. I'm not interested in other answers where we create multiple
>>>>> sources, ie: source1 = builder.stream("topic1") and source2 =
>>>>> builder.stream("topic2") as the interface for StreamsBuilder.stream
>>>> supports
>>>>> an array of topics.
>>>>>
>>>>> I've been able to subscribe to multiple topics before, I just can't
>>>>> replicate how we've done this. (This code is running in a different
>>>>> environment and working as expected, so not sure if it's a timing issue
>>>> or
>>>>> something else)
>>>>>
>>>>> List<String> topics = Arrays.asList("topic1", "topic2");
>>>>>
>>>>> StreamsBuilder builder = new StreamsBuilder();
>>>>> KStream<String, String> source = builder.stream(topics);
>>>>>
>>>>> source
>>>>> .transformValues(...)
>>>>> .map(key, value) -> ...)
>>>>> .to((key, value, record) -> ...);
>>>>>
>>>>> new KafkaStreams(builder.build(), props).start();
>>>>>
>>>>> This question has been posted on stackoverflow in case you want to
>>>>> answer there:
>>>>
>> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to