Not sure. Would need to think about it more. However, default commit interval in streams is 30 sec. You can configure is via StreamConfig COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5 minutes sounds ok. Question is, what would happen if the JVM goes down before you delete the topic.
-Matthias On 12/3/16 2:07 AM, Ali Akhtar wrote: > Is there a way to make sure the offsets got committed? Perhaps, after the > last msg has been consumed, I can setup a task to run after a safe time > (say 5 mins? ) in another thread which would delete the topic? What would > be a safe time to use? > > On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> I guess yes. You might only want to make sure the topic offsets got >> committed -- not sure if committing offsets of a deleted topic could >> cause issue (ie, crashing you Streams app) >> >> -Matthias >> >> On 12/2/16 11:04 PM, Ali Akhtar wrote: >>> Thank you very much. Last q - Is it safe to do this from within a call >> back >>> processing that topic , once it reaches the last message? (It keeps a >>> count of how many messages processed vs how many remaining) >>> >>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" <matth...@confluent.io> >> wrote: >>> >>>> You can use TopicCommand to delete a topic within Java: >>>> >>>>> final TopicCommand.TopicCommandOptions commandOptions = new >>>> TopicCommand.TopicCommandOptions(new String[]{ >>>>> "--zookeeper", "zookeperHost:2181", >>>>> "--delete", >>>>> "--topic", "TOPIC-TO-BE-DELETED"}); >>>>> TopicCommand.deleteTopic(zkUtils, commandOptions); >>>> >>>> So you can delete a topic within your Streams app. >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 12/2/16 9:25 PM, Ali Akhtar wrote: >>>>> Is there a way to delete the processed topics via streams or the java >>>>> driver? Or only thru the bash script? >>>>> >>>>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" <matth...@confluent.io> >>>> wrote: >>>>> >>>>>> If you keep old topics that are completely processed, there would be >>>>>> increasing overhead, because Streams would try to read from those >> topics >>>>>> as long as they exist. Thus, more fetch request will be sent to those >>>>>> more topics over time, while most fetch request will return without >> any >>>>>> new data (as those old topic do not have new data) >>>>>> >>>>>> If you delete completely processed topics, there will be no overhead. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 12/2/16 3:58 PM, Ali Akhtar wrote: >>>>>>> Hey Matthias, >>>>>>> >>>>>>> So I have a scenario where I need to batch a group of messages >>>> together. >>>>>>> >>>>>>> I'm considering creating a new topic for each batch that arrives, i.e >>>>>>> batch_<some_id>. >>>>>>> >>>>>>> Each batch_<id> topic will have a finite number of messages, and then >>>> it >>>>>>> will remain empty. Essentially these will be throwaway topics. >>>>>>> >>>>>>> Is there any overhead to there being a lot of these topics, and >> having >>>> a >>>>>>> listener for batch_.* , or is this effectively like having one >> listener >>>>>> for >>>>>>> one topic? >>>>>>> >>>>>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax < >>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> 1) There will be once consumer per thread. The number of thread is >>>>>>>> defined by the number of instances you start and how many threads >> you >>>>>>>> configure for each instance via StreamConfig parameter >>>>>>>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by >>>>>> yourself. >>>>>>>> >>>>>>>> Depending on the number to partitions in your topics, each thread >> will >>>>>>>> process one or multiple partitions. As a partition will be processed >>>> by >>>>>>>> exactly one thread, the overall number of partitions over all you >>>> input >>>>>>>> topics limits your max number of thread (if you have more threads, >>>> those >>>>>>>> will just be idle) >>>>>>>> >>>>>>>> 2) Thus, there should be no performance issues. Furthermore, if you >>>>>>>> create new topic while you application is running -- and if this >> might >>>>>>>> overload you current application -- you can always start new >> instances >>>>>>>> an scale-out you application dynamically -- Kafka Streams is fully >>>>>> elastic. >>>>>>>> >>>>>>>> Have a look here for more details: >>>>>>>> http://docs.confluent.io/current/streams/architecture.html >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 12/2/16 6:23 AM, Ali Akhtar wrote: >>>>>>>>> That's pretty useful to know - thanks. >>>>>>>>> >>>>>>>>> 1) If I listened too foo-.*, and there were 5 foo topics created >>>> after >>>>>>>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will >> this >>>>>>>> create >>>>>>>>> 5 consumers / threads / instances, or will it be just 1 instance >> that >>>>>>>>> receives the messages for all of those topics? >>>>>>>>> >>>>>>>>> 2) Will this cause issues performance issues if i had a lot of >>>>>> throwaway >>>>>>>>> foo topics being created, or will this scale? >>>>>>>>> >>>>>>>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy <damian....@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Ali, >>>>>>>>>> >>>>>>>>>> The only way KafkaStreams will process new topics after start is >> if >>>>>> the >>>>>>>>>> original stream was defined with a regular expression, i.e, >>>>>>>>>> kafka.stream(Pattern.compile("foo-.*"); >>>>>>>>>> >>>>>>>>>> If any new topics are added after start that match the pattern, >> then >>>>>>>> they >>>>>>>>>> will also be consumed. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> On Fri, 2 Dec 2016 at 13:13 Ali Akhtar <ali.rac...@gmail.com> >>>> wrote: >>>>>>>>>> >>>>>>>>>>> Heya, >>>>>>>>>>> >>>>>>>>>>> Normally, you add your topics and their callbacks to a >>>> StreamBuilder, >>>>>>>> and >>>>>>>>>>> then call KafkaStreams.start() to start ingesting those topics. >>>>>>>>>>> >>>>>>>>>>> Is it possible to add a new topic to the StreamBuilder, and start >>>>>>>>>> ingesting >>>>>>>>>>> that as well, after KafkaStreams.start() has been called? >>>>>>>>>>> >>>>>>>>>>> Thanks. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature