Not sure. Would need to think about it more. However, default commit
interval in streams is 30 sec. You can configure is via StreamConfig
COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5
minutes sounds ok. Question is, what would happen if the JVM goes down
before you delete the topic.


-Matthias

On 12/3/16 2:07 AM, Ali Akhtar wrote:
> Is there a way to make sure the offsets got committed? Perhaps, after the
> last msg has been consumed, I can setup a task to run after a safe time
> (say 5 mins? ) in another thread which would delete the topic? What would
> be a safe time to use?
> 
> On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I guess yes. You might only want to make sure the topic offsets got
>> committed -- not sure if committing offsets of a deleted topic could
>> cause issue (ie, crashing you Streams app)
>>
>> -Matthias
>>
>> On 12/2/16 11:04 PM, Ali Akhtar wrote:
>>> Thank you very much. Last q - Is it safe to do this from within a call
>> back
>>> processing that topic ,  once it reaches the last message? (It keeps a
>>> count of how many messages processed vs how many remaining)
>>>
>>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io>
>> wrote:
>>>
>>>> You can use TopicCommand to delete a topic within Java:
>>>>
>>>>> final TopicCommand.TopicCommandOptions commandOptions = new
>>>> TopicCommand.TopicCommandOptions(new String[]{
>>>>>     "--zookeeper", "zookeperHost:2181",
>>>>>     "--delete",
>>>>>     "--topic", "TOPIC-TO-BE-DELETED"});
>>>>> TopicCommand.deleteTopic(zkUtils, commandOptions);
>>>>
>>>> So you can delete a topic within your Streams app.
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 12/2/16 9:25 PM, Ali Akhtar wrote:
>>>>> Is there a way to delete the processed topics via streams or the java
>>>>> driver? Or only thru the bash script?
>>>>>
>>>>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io>
>>>> wrote:
>>>>>
>>>>>> If you keep old topics that are completely processed, there would be
>>>>>> increasing overhead, because Streams would try to read from those
>> topics
>>>>>> as long as they exist. Thus, more fetch request will be sent to those
>>>>>> more topics over time, while most fetch request will return without
>> any
>>>>>> new data (as those old topic do not have new data)
>>>>>>
>>>>>> If you delete completely processed topics, there will be no overhead.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/2/16 3:58 PM, Ali Akhtar wrote:
>>>>>>> Hey Matthias,
>>>>>>>
>>>>>>> So I have a scenario where I need to batch a group of messages
>>>> together.
>>>>>>>
>>>>>>> I'm considering creating a new topic for each batch that arrives, i.e
>>>>>>> batch_<some_id>.
>>>>>>>
>>>>>>> Each batch_<id> topic will have a finite number of messages, and then
>>>> it
>>>>>>> will remain empty. Essentially these will be throwaway topics.
>>>>>>>
>>>>>>> Is there any overhead to there being a lot of these topics, and
>> having
>>>> a
>>>>>>> listener for batch_.* , or is this effectively like having one
>> listener
>>>>>> for
>>>>>>> one topic?
>>>>>>>
>>>>>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> 1) There will be once consumer per thread. The number of thread is
>>>>>>>> defined by the number of instances you start and how many threads
>> you
>>>>>>>> configure for each instance via StreamConfig parameter
>>>>>>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
>>>>>> yourself.
>>>>>>>>
>>>>>>>> Depending on the number to partitions in your topics, each thread
>> will
>>>>>>>> process one or multiple partitions. As a partition will be processed
>>>> by
>>>>>>>> exactly one thread, the overall number of partitions over all you
>>>> input
>>>>>>>> topics limits your max number of thread (if you have more threads,
>>>> those
>>>>>>>> will just be idle)
>>>>>>>>
>>>>>>>> 2) Thus, there should be no performance issues. Furthermore, if you
>>>>>>>> create new topic while you application is running -- and if this
>> might
>>>>>>>> overload you current application -- you can always start new
>> instances
>>>>>>>> an scale-out you application dynamically -- Kafka Streams is fully
>>>>>> elastic.
>>>>>>>>
>>>>>>>> Have a look here for more details:
>>>>>>>> http://docs.confluent.io/current/streams/architecture.html
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/2/16 6:23 AM, Ali Akhtar wrote:
>>>>>>>>> That's pretty useful to know - thanks.
>>>>>>>>>
>>>>>>>>> 1) If I listened too foo-.*, and there were 5 foo topics created
>>>> after
>>>>>>>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will
>> this
>>>>>>>> create
>>>>>>>>> 5 consumers / threads / instances, or will it be just 1 instance
>> that
>>>>>>>>> receives the messages for all of those topics?
>>>>>>>>>
>>>>>>>>> 2) Will this cause issues performance issues if i had a lot of
>>>>>> throwaway
>>>>>>>>> foo topics being created, or will this scale?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian....@gmail.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ali,
>>>>>>>>>>
>>>>>>>>>> The only way KafkaStreams will process new topics after start is
>> if
>>>>>> the
>>>>>>>>>> original stream was defined with a regular expression, i.e,
>>>>>>>>>> kafka.stream(Pattern.compile("foo-.*");
>>>>>>>>>>
>>>>>>>>>> If any new topics are added after start that match the pattern,
>> then
>>>>>>>> they
>>>>>>>>>> will also be consumed.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Damian
>>>>>>>>>>
>>>>>>>>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com>
>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Heya,
>>>>>>>>>>>
>>>>>>>>>>> Normally, you add your topics and their callbacks to a
>>>> StreamBuilder,
>>>>>>>> and
>>>>>>>>>>> then call KafkaStreams.start() to start ingesting those topics.
>>>>>>>>>>>
>>>>>>>>>>> Is it possible to add a new topic to the StreamBuilder, and start
>>>>>>>>>> ingesting
>>>>>>>>>>> that as well, after KafkaStreams.start() has been called?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to