Re: Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key

2016-12-03 Thread Eno Thereska
Hi Mathieu,

What version of Kafka are you using? There was recently a fix that went into 
trunk, just checking if you're using an older version.
(to make forward progress you can turn the cache off, like this:
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
)

Thanks
Eno
> On 4 Dec 2016, at 03:47, Mathieu Fenniak  wrote:
> 
> Hey all,
> 
> I've just been running a quick test of my kafka-streams application on the
> latest Kafka trunk (@e43bbce), and came across this error.  I was wondering
> if anyone has seen this error before, have any thoughts on what might cause
> it, or can suggest a direction to investigate it further.
> 
> Full exception:
> https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c
> 
> java.lang.IllegalStateException: Attempting to put a clean entry for key
> [urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c-d43f0668b755]
> into NamedCache [0_0-TimesheetNonBillableHours] when it already contains a
> dirty entry for the same key
> at
> org.apache.kafka.streams.state.internals.NamedCache.put(NamedCache.java:124)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:146)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:133)
> at
> org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateValueGetter.get(KTableAggregate.java:128)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> ... more ...



Attempting to put a clean entry for key [...] into NamedCache [...] when it already contains a dirty entry for the same key

2016-12-03 Thread Mathieu Fenniak
Hey all,

I've just been running a quick test of my kafka-streams application on the
latest Kafka trunk (@e43bbce), and came across this error.  I was wondering
if anyone has seen this error before, have any thoughts on what might cause
it, or can suggest a direction to investigate it further.

Full exception:
https://gist.github.com/mfenniak/509fb82dfcfda79a21cfc1b07dafa89c

java.lang.IllegalStateException: Attempting to put a clean entry for key
[urn:replicon-tenant:strprc971e3ca9:timesheet:97c0ce25-e039-4e8b-9f2c-d43f0668b755]
into NamedCache [0_0-TimesheetNonBillableHours] when it already contains a
dirty entry for the same key
at
org.apache.kafka.streams.state.internals.NamedCache.put(NamedCache.java:124)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:146)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:133)
at
org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateValueGetter.get(KTableAggregate.java:128)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:81)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:54)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
... more ...


Re: Initializing StateStores takes *really* long for large datasets

2016-12-03 Thread williamtellme123
Unsubscribe


Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
 Original message From: Guozhang Wang  
Date: 12/2/16  5:13 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
Initializing StateStores takes *really* long for large datasets 
Before we have the a single-knob memory management feature, I'd like to
propose reducing the Streams' default config values for RocksDB caching and
memory block size. For example, I remember Henry has done some fine tuning
on the RocksDB config for his use case:

https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4
https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576



We could check if some of those changes are appropriate in general and if
yes change the default settings accordingly.

Henry

On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi 
wrote:

> +1 on this.
>
> Ara.
>
> > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> >
> > I'd like to quickly reinforce Frank's opinion regarding the rocksdb
> memory
> > usage.  I was also surprised by the amount of non-JVM-heap memory being
> > used and had to tune the 100 MB default down considerably.  It's also
> > unfortunate that it's hard to estimate the memory requirements for a KS
> app
> > because of this.  If you have ten stores, and assuming the default
> config,
> > you'd need a GB of memory for the rocksdb cache if you run 1 app, but
> only
> > half a GB if you run two app instances because the stores will be
> > distributed.
> >
> > It would be much nicer to be able to give KS a fixed amount of memory in
> a
> > config that it divided among the active stores on a node.  Configure it
> > with N GB; if a rebalance adds more tasks and stores, they each get less
> > RAM; if a rebalance removes tasks and stores, the remaining stores get
> more
> > RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> > interface because it doesn't get any state about the KS topology to make
> > decisions; which are arguably not config, but tuning / performance
> > decisions.
> >
> > Mathieu
> >
> >
> >
> > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:
> >
> >> I'll write an update on where I am now.
> >>
> >> I've got about 40 'primary' topics, some small, some up to about 10M
> >> messages,
> >> and about 30 internal topics, divided over 6 stream instances, all
> running
> >> in a single
> >> app, talking to a 3 node Kafka cluster.
> >>
> >> I use a single thread per stream instance, as my prime concern is now to
> >> get it
> >> to run stable, rather than optimizing performance.
> >>
> >> My biggest issue was that after a few hours my application started to
> slow
> >> down
> >> to ultimately freeze up or crash. It turned out that RocksDb consumed
> all
> >> my
> >> memory, which I overlooked as it was off-heap.
> >>
> >> I was fooling around with RocksDb settings a bit but I had missed the
> most
> >> important
> >> one:
> >>
> >>    BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> >>    tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> >>    tableConfig.setBlockSize(BLOCK_SIZE);
> >>    options.setTableFormatConfig(tableConfig);
> >>
> >> The block cache size defaults to a whopping 100Mb per store, and that
> gets
> >> expensive
> >> fast. I reduced it to a few megabytes. My data size is so big that I
> doubt
> >> it is very effective
> >> anyway. Now it seems more stable.
> >>
> >> I'd say that a smaller default makes sense, especially because the
> failure
> >> case is
> >> so opaque (running all tests just fine but with a serious dataset it
> dies
> >> slowly)
> >>
> >> Another thing I see is that while starting all my instances, some are
> quick
> >> and some take
> >> time (makes sense as the data size varies greatly), but as more
> instances
> >> start up, they
> >> start to use more and more CPU I/O and network, that the initialization
> of
> >> the bigger ones
> >> takes even longer, increasing the chance that one of them takes longer
> than
> >> the
> >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we
> can
> >> separate the 'initialize' and 'start' step somehow.
> >>
> >> In this case we could log better: If initialization is taking longer
> than
> >> the timeout, it ends up
> >> being reassigned (in my case to the same instance) and then it errors
> out
> >> on being unable
> >> to lock the state dir. That message isn't too informative as the
> timeout is
> >> the actual problem.
> >>
> >> regards, Frank
> >>
> >>
> >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang 
> wrote:
> >>
> >>> Hello Frank,
> >>>
> >>> How many instances do you have in your apps and how many threads did
> you
> >>> use per thread? Note that besides the 

Re: Kafka windowed table not aggregating correctly

2016-12-03 Thread williamtellme123
Unsubscribe


Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
 Original message From: Guozhang Wang  
Date: 12/2/16  5:48 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
Kafka windowed table not aggregating correctly 
Sachin,

One thing to note is that the retention of the windowed stores works by
keeping multiple segments of the stores where each segments stores a time
range which can potentially span multiple windows, if a new window needs to
be created that is further from the oldest segment's time range + retention
period (from your code it seems you do not override it from
TimeWindows.of("stream-table",
10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one
day is used.

So with WallclockTimeExtractor since it is using system time, it wont give
you timestamps that span for more than a day during a short period of time,
but if your own defined timestamps expand that value, then old segments
will be dropped immediately and hence the aggregate values will be returned
as a single value.

Guozhang


On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax 
wrote:

> The extractor is used in
>
> org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords()
>
> Let us know, if you could resolve the problem or need more help.
>
> -Matthias
>
> On 12/2/16 11:46 AM, Sachin Mittal wrote:
> > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am
> > using. The version is 0.5x. Can you please tell me what code in streams
> > calls the timestamp extractor. I can look there to see if there is any
> > issue.
> >
> > Again issue happens only when producing the messages using producer that
> is
> > compatible with kafka version 0.8x. I see that this producer does not
> send
> > a record timestamp as this was introduced in version 0.10 only.
> >
> > Thanks
> > Sachin
> >
> > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" 
> wrote:
> >
> >> I am not sure what is happening. That's why it would be good to have a
> >> toy example to reproduce the issue.
> >>
> >> What do you mean by "Kafka node version 0.5"?
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:30 AM, Sachin Mittal wrote:
> >>> I can provide with the data but data does not seem to be the issue.
> >>> If I submit the same data and use same timestamp extractor  using the
> >> java
> >>> client with kafka version 0.10.0.1 aggregation works fine.
> >>> I find the issue only when submitting the data with kafka node version
> >> 0.5.
> >>> It looks like the stream does not extract the time correctly in that
> >> case.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" 
> >> wrote:
> >>>
>  Can you provide example input data (including timetamps) and result.
>  What is the expected result (ie, what aggregation do you apply)?
> 
> 
>  -Matthias
> 
>  On 12/2/16 7:43 AM, Sachin Mittal wrote:
> > Hi,
> > After much debugging I found an issue with timestamp extractor.
> >
> > If I use a custom timestamp extractor with following code:
> > public static class MessageTimestampExtractor implements
> > TimestampExtractor {
> > public long extract(ConsumerRecord record) {
> > if (record.value() instanceof Message) {
> > return ((Message) record.value()).ts;
> > } else {
> > return record.timestamp();
> > }
> > }
> > }
> >
> > Here message has a long field ts which stores the timestamp, the
> > aggregation does not work.
> > Note I have checked and ts has valid timestamp values.
> >
> > However if I replace it with say WallclockTimestampExtractor
> >> aggregation
>  is
> > working fine.
> >
> > I do not understand what could be the issue here.
> >
> > Also note I am using kafka streams version 0.10.0.1 and I am
> publishing
> > messages via
> > https://github.com/SOHU-Co/kafka-node/ whose version is quite old
> >> 0.5.x
> >
> > Let me know if there is some bug in time stamp extractions.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang 
>  wrote:
> >
> >> Sachin,
> >>
> >> This is indeed a bit wired, and we'd like to try to re-produce your
>  issue
> >> locally. Do you have a sample input data for us to try out?
> >>
> >> Guozhang
> >>
> >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal  >
> >> wrote:
> >>
> >>> Hi,
> >>> I fixed that sorted set issue but I am facing a weird problem
> which I
>  am
> >>> not able to replicate.
> >>>
> >>> Here is the sample problem that I could isolate:
> >>> My class is like this:
> >>> public static class Message 

How to collect connect metrcs

2016-12-03 Thread Will Du
Hi folks,
How I can collect Kafka connect metrics from Confluent? Are there any API to 
use?
In addition, if one file is very big, can multiple task working on the same 
file simultaneously?

Thanks,
Will



Re: Suggestions

2016-12-03 Thread Martin Gainty
Vincenzo


Nota Bene:

you can *force* resolution of a good version (3.4.8) to be 
used by all artifacts downstream from your parent pom.xml..the mechanism is 
called 


https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management


Buona Fortuna

Martini
__




From: Vincenzo D'Amore 
Sent: Saturday, December 3, 2016 12:14 PM
To: users@kafka.apache.org
Subject: Re: Suggestions

I found what's wrong, well... finally! Given that consumer
application shoul load received data into a Solr instance.
Incidentally my version of Solr is SolrCloud, and Solrj client use
zookeeper, a different version of zookeeper...
Now I specified in my pom.xml the same version of zookeeper 3.4.8 I found
in my kafka_2.11-0.10.1.0. overriding the original configuration, it seems
work very well.
So as far as I know now, this strange behaviour was due to a dependency
problem.

For sake of clarity, now I'm trying to restore original version of
zookeeper just to be sure that the problem happens again.

Thanks to everybody tried to help me.

On Fri, Dec 2, 2016 at 8:50 PM, Apurva Mehta  wrote:

> >
> >  then, the strange thing is that the consumer on
> > the second topic which stay in poll forever, *without receive any
> message*.
>
>
> How long is 'forever'? Did you wait more than 5 minutes?
>
> On Fri, Dec 2, 2016 at 2:55 AM, Vincenzo D'Amore 
> wrote:
>
> > Hi Kafka Gurus :)
> >
> > I'm creating process between few applications.
> >
> > First application create a producer and then write a message into a main
> > topic (A), within the message there is the name of a second topic (B).
> Then
> > promptly create a second producer and write few message into the new
> topic
> > (B).
> >
> > I write here because I don't understand why, a second application which
> is
> > in poll on the main topic, when receive the first message (which contain
> > the name of second topic), then, the strange thing is that the consumer
> on
> > the second topic which stay in poll forever, *without receive any
> message*.
> >
> > What is wrong in this scenario, am I missing something? Please help.
> >
> > producer has this default properties:
> >
> >   "bootstrap.servers":"localhost:9092",
> >   "acks":"all",
> >   "retries":0,
> >   "batch.size":16384,
> >   "linger.ms":1,
> >   "buffer.memory":33554432,
> >
> > "key.serializer":"org.apache.kafka.common.serialization.Stri
> ngSerializer",
> >
> > "value.serializer":"org.apache.kafka.common.serialization.
> > StringSerializer"
> >
> > consumer has this default properties:
> >
> >   "bootstrap.servers":"localhost:9092",
> >   "enable.auto.commit":"true",
> >   "auto.commit.interval.ms":"1000",
> >   "session.timeout.ms":"3",
> >   "buffer.memory":33554432,
> >   "key.deserializer":
> > "org.apache.kafka.common.serialization.StringDeserializer",
> >   "value.deserializer":
> > "org.apache.kafka.common.serialization.StringDeserializer"
> >
> > usually there are 2 active groups (group_A and group_B).
> >
> > Best regards,
> > Vincenzo
> >
> > --
> > Vincenzo D'Amore
> > email: v.dam...@gmail.com
> > skype: free.dev
> > mobile: +39 349 8513251
> >
>



--
Vincenzo D'Amore
email: v.dam...@gmail.com
skype: free.dev
mobile: +39 349 8513251 <349%20851%203251>


Re: How to connect Modbus, DNP or IEC61850 data to Kafka

2016-12-03 Thread hans
If the data volumes are low or you just want a quick prototype as a proof of 
concept you could use existing tools like node-red to connect the various input 
protocols with Kafka as an output protocol. For example install from 
http://nodered.org then install node-red-contrib-modbus, then install 
node-red-contrib-rdkafka, connect and configure the inputs and outputs and you 
will have data flow. Same with just about any serial, TCP, or UDP based 
protocol.

If you want something higher performance or industrial strength you can find 
commercial or open source Kafka Connectors or write your own native Kafka 
Connector for each input protocol. There are over 150 Kafka Connectors already 
built (search for "kafka-connect-*" in github) and see the following connector 
landing page for more info on Kafka Connect 
https://www.confluent.io/product/connectors/


-hans



Sent from my iPhone
> On Dec 3, 2016, at 3:53 AM, Wang LongTian  wrote:
> 
> Dear all gurus,
> I'm new to Kafka and I'm going to connect the real time data steaming from 
> power system supervision and control devices to Kafka via different 
> communication protocols for example Modbus, DNP or IEC61850 and next to Storm 
> processing system. 
> I'm wondering how can I get these data via Kafka and I don't know whether 
> that's supported or not.
> 
> Any suggestion and hint are warmly welcomed!
> 
> Regards, 
> Long Tian
> 
> 


Re: Suggestions

2016-12-03 Thread Vincenzo D'Amore
I found what's wrong, well... finally! Given that consumer
application shoul load received data into a Solr instance.
Incidentally my version of Solr is SolrCloud, and Solrj client use
zookeeper, a different version of zookeeper...
Now I specified in my pom.xml the same version of zookeeper 3.4.8 I found
in my kafka_2.11-0.10.1.0. overriding the original configuration, it seems
work very well.
So as far as I know now, this strange behaviour was due to a dependency
problem.

For sake of clarity, now I'm trying to restore original version of
zookeeper just to be sure that the problem happens again.

Thanks to everybody tried to help me.

On Fri, Dec 2, 2016 at 8:50 PM, Apurva Mehta  wrote:

> >
> >  then, the strange thing is that the consumer on
> > the second topic which stay in poll forever, *without receive any
> message*.
>
>
> How long is 'forever'? Did you wait more than 5 minutes?
>
> On Fri, Dec 2, 2016 at 2:55 AM, Vincenzo D'Amore 
> wrote:
>
> > Hi Kafka Gurus :)
> >
> > I'm creating process between few applications.
> >
> > First application create a producer and then write a message into a main
> > topic (A), within the message there is the name of a second topic (B).
> Then
> > promptly create a second producer and write few message into the new
> topic
> > (B).
> >
> > I write here because I don't understand why, a second application which
> is
> > in poll on the main topic, when receive the first message (which contain
> > the name of second topic), then, the strange thing is that the consumer
> on
> > the second topic which stay in poll forever, *without receive any
> message*.
> >
> > What is wrong in this scenario, am I missing something? Please help.
> >
> > producer has this default properties:
> >
> >   "bootstrap.servers":"localhost:9092",
> >   "acks":"all",
> >   "retries":0,
> >   "batch.size":16384,
> >   "linger.ms":1,
> >   "buffer.memory":33554432,
> >
> > "key.serializer":"org.apache.kafka.common.serialization.Stri
> ngSerializer",
> >
> > "value.serializer":"org.apache.kafka.common.serialization.
> > StringSerializer"
> >
> > consumer has this default properties:
> >
> >   "bootstrap.servers":"localhost:9092",
> >   "enable.auto.commit":"true",
> >   "auto.commit.interval.ms":"1000",
> >   "session.timeout.ms":"3",
> >   "buffer.memory":33554432,
> >   "key.deserializer":
> > "org.apache.kafka.common.serialization.StringDeserializer",
> >   "value.deserializer":
> > "org.apache.kafka.common.serialization.StringDeserializer"
> >
> > usually there are 2 active groups (group_A and group_B).
> >
> > Best regards,
> > Vincenzo
> >
> > --
> > Vincenzo D'Amore
> > email: v.dam...@gmail.com
> > skype: free.dev
> > mobile: +39 349 8513251
> >
>



-- 
Vincenzo D'Amore
email: v.dam...@gmail.com
skype: free.dev
mobile: +39 349 8513251 <349%20851%203251>


How to connect Modbus, DNP or IEC61850 data to Kafka

2016-12-03 Thread Wang LongTian
Dear all gurus,
I'm new to Kafka and I'm going to connect the real time data steaming from 
power system supervision and control devices to Kafka via different 
communication protocols for example Modbus, DNP or IEC61850 and next to Storm 
processing system. 
I'm wondering how can I get these data via Kafka and I don't know whether 
that's supported or not.

Any suggestion and hint are warmly welcomed!

Regards, 
Long Tian




Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Ali Akhtar
I suppose the topic won't be deleted, but this would be a rare enough
occurrence that there won't be too many dormant topics hanging around.

Alternatively perhaps I can store the undeleted topics somewhere, and
whenever a new node starts, it could check this list and delete them.

On Sat, Dec 3, 2016 at 3:23 PM, Matthias J. Sax 
wrote:

> Not sure. Would need to think about it more. However, default commit
> interval in streams is 30 sec. You can configure is via StreamConfig
> COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5
> minutes sounds ok. Question is, what would happen if the JVM goes down
> before you delete the topic.
>
>
> -Matthias
>
> On 12/3/16 2:07 AM, Ali Akhtar wrote:
> > Is there a way to make sure the offsets got committed? Perhaps, after the
> > last msg has been consumed, I can setup a task to run after a safe time
> > (say 5 mins? ) in another thread which would delete the topic? What would
> > be a safe time to use?
> >
> > On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax 
> > wrote:
> >
> >> I guess yes. You might only want to make sure the topic offsets got
> >> committed -- not sure if committing offsets of a deleted topic could
> >> cause issue (ie, crashing you Streams app)
> >>
> >> -Matthias
> >>
> >> On 12/2/16 11:04 PM, Ali Akhtar wrote:
> >>> Thank you very much. Last q - Is it safe to do this from within a call
> >> back
> >>> processing that topic ,  once it reaches the last message? (It keeps a
> >>> count of how many messages processed vs how many remaining)
> >>>
> >>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" 
> >> wrote:
> >>>
>  You can use TopicCommand to delete a topic within Java:
> 
> > final TopicCommand.TopicCommandOptions commandOptions = new
>  TopicCommand.TopicCommandOptions(new String[]{
> > "--zookeeper", "zookeperHost:2181",
> > "--delete",
> > "--topic", "TOPIC-TO-BE-DELETED"});
> > TopicCommand.deleteTopic(zkUtils, commandOptions);
> 
>  So you can delete a topic within your Streams app.
> 
>  -Matthias
> 
> 
> 
>  On 12/2/16 9:25 PM, Ali Akhtar wrote:
> > Is there a way to delete the processed topics via streams or the java
> > driver? Or only thru the bash script?
> >
> > On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" 
>  wrote:
> >
> >> If you keep old topics that are completely processed, there would be
> >> increasing overhead, because Streams would try to read from those
> >> topics
> >> as long as they exist. Thus, more fetch request will be sent to
> those
> >> more topics over time, while most fetch request will return without
> >> any
> >> new data (as those old topic do not have new data)
> >>
> >> If you delete completely processed topics, there will be no
> overhead.
> >>
> >> -Matthias
> >>
> >> On 12/2/16 3:58 PM, Ali Akhtar wrote:
> >>> Hey Matthias,
> >>>
> >>> So I have a scenario where I need to batch a group of messages
>  together.
> >>>
> >>> I'm considering creating a new topic for each batch that arrives,
> i.e
> >>> batch_.
> >>>
> >>> Each batch_ topic will have a finite number of messages, and
> then
>  it
> >>> will remain empty. Essentially these will be throwaway topics.
> >>>
> >>> Is there any overhead to there being a lot of these topics, and
> >> having
>  a
> >>> listener for batch_.* , or is this effectively like having one
> >> listener
> >> for
> >>> one topic?
> >>>
> >>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
>  matth...@confluent.io>
> >>> wrote:
> >>>
>  1) There will be once consumer per thread. The number of thread is
>  defined by the number of instances you start and how many threads
> >> you
>  configure for each instance via StreamConfig parameter
>  NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
> >> yourself.
> 
>  Depending on the number to partitions in your topics, each thread
> >> will
>  process one or multiple partitions. As a partition will be
> processed
>  by
>  exactly one thread, the overall number of partitions over all you
>  input
>  topics limits your max number of thread (if you have more threads,
>  those
>  will just be idle)
> 
>  2) Thus, there should be no performance issues. Furthermore, if
> you
>  create new topic while you application is running -- and if this
> >> might
>  overload you current application -- you can always start new
> >> instances
>  an scale-out you application dynamically -- Kafka Streams is fully
> >> elastic.
> 
>  Have a look here for more details:
>  http://docs.confluent.io/current/streams/architecture.html
> 
> 

Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Matthias J. Sax
Not sure. Would need to think about it more. However, default commit
interval in streams is 30 sec. You can configure is via StreamConfig
COMMIT_INTERVAL_MS. So using the additional thread and waiting for 5
minutes sounds ok. Question is, what would happen if the JVM goes down
before you delete the topic.


-Matthias

On 12/3/16 2:07 AM, Ali Akhtar wrote:
> Is there a way to make sure the offsets got committed? Perhaps, after the
> last msg has been consumed, I can setup a task to run after a safe time
> (say 5 mins? ) in another thread which would delete the topic? What would
> be a safe time to use?
> 
> On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax 
> wrote:
> 
>> I guess yes. You might only want to make sure the topic offsets got
>> committed -- not sure if committing offsets of a deleted topic could
>> cause issue (ie, crashing you Streams app)
>>
>> -Matthias
>>
>> On 12/2/16 11:04 PM, Ali Akhtar wrote:
>>> Thank you very much. Last q - Is it safe to do this from within a call
>> back
>>> processing that topic ,  once it reaches the last message? (It keeps a
>>> count of how many messages processed vs how many remaining)
>>>
>>> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" 
>> wrote:
>>>
 You can use TopicCommand to delete a topic within Java:

> final TopicCommand.TopicCommandOptions commandOptions = new
 TopicCommand.TopicCommandOptions(new String[]{
> "--zookeeper", "zookeperHost:2181",
> "--delete",
> "--topic", "TOPIC-TO-BE-DELETED"});
> TopicCommand.deleteTopic(zkUtils, commandOptions);

 So you can delete a topic within your Streams app.

 -Matthias



 On 12/2/16 9:25 PM, Ali Akhtar wrote:
> Is there a way to delete the processed topics via streams or the java
> driver? Or only thru the bash script?
>
> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" 
 wrote:
>
>> If you keep old topics that are completely processed, there would be
>> increasing overhead, because Streams would try to read from those
>> topics
>> as long as they exist. Thus, more fetch request will be sent to those
>> more topics over time, while most fetch request will return without
>> any
>> new data (as those old topic do not have new data)
>>
>> If you delete completely processed topics, there will be no overhead.
>>
>> -Matthias
>>
>> On 12/2/16 3:58 PM, Ali Akhtar wrote:
>>> Hey Matthias,
>>>
>>> So I have a scenario where I need to batch a group of messages
 together.
>>>
>>> I'm considering creating a new topic for each batch that arrives, i.e
>>> batch_.
>>>
>>> Each batch_ topic will have a finite number of messages, and then
 it
>>> will remain empty. Essentially these will be throwaway topics.
>>>
>>> Is there any overhead to there being a lot of these topics, and
>> having
 a
>>> listener for batch_.* , or is this effectively like having one
>> listener
>> for
>>> one topic?
>>>
>>> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
 matth...@confluent.io>
>>> wrote:
>>>
 1) There will be once consumer per thread. The number of thread is
 defined by the number of instances you start and how many threads
>> you
 configure for each instance via StreamConfig parameter
 NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
>> yourself.

 Depending on the number to partitions in your topics, each thread
>> will
 process one or multiple partitions. As a partition will be processed
 by
 exactly one thread, the overall number of partitions over all you
 input
 topics limits your max number of thread (if you have more threads,
 those
 will just be idle)

 2) Thus, there should be no performance issues. Furthermore, if you
 create new topic while you application is running -- and if this
>> might
 overload you current application -- you can always start new
>> instances
 an scale-out you application dynamically -- Kafka Streams is fully
>> elastic.

 Have a look here for more details:
 http://docs.confluent.io/current/streams/architecture.html


 -Matthias

 On 12/2/16 6:23 AM, Ali Akhtar wrote:
> That's pretty useful to know - thanks.
>
> 1) If I listened too foo-.*, and there were 5 foo topics created
 after
> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will
>> this
 create
> 5 consumers / threads / instances, or will it be just 1 instance
>> that
> receives the messages for all of those topics?
>
> 2) Will this cause issues performance issues if i had a lot of
>> throwaway
> foo topics being created, or will this 

Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Ali Akhtar
Is there a way to make sure the offsets got committed? Perhaps, after the
last msg has been consumed, I can setup a task to run after a safe time
(say 5 mins? ) in another thread which would delete the topic? What would
be a safe time to use?

On Sat, Dec 3, 2016 at 3:04 PM, Matthias J. Sax 
wrote:

> I guess yes. You might only want to make sure the topic offsets got
> committed -- not sure if committing offsets of a deleted topic could
> cause issue (ie, crashing you Streams app)
>
> -Matthias
>
> On 12/2/16 11:04 PM, Ali Akhtar wrote:
> > Thank you very much. Last q - Is it safe to do this from within a call
> back
> > processing that topic ,  once it reaches the last message? (It keeps a
> > count of how many messages processed vs how many remaining)
> >
> > On 3 Dec 2016 11:36 a.m., "Matthias J. Sax" 
> wrote:
> >
> >> You can use TopicCommand to delete a topic within Java:
> >>
> >>> final TopicCommand.TopicCommandOptions commandOptions = new
> >> TopicCommand.TopicCommandOptions(new String[]{
> >>> "--zookeeper", "zookeperHost:2181",
> >>> "--delete",
> >>> "--topic", "TOPIC-TO-BE-DELETED"});
> >>> TopicCommand.deleteTopic(zkUtils, commandOptions);
> >>
> >> So you can delete a topic within your Streams app.
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 12/2/16 9:25 PM, Ali Akhtar wrote:
> >>> Is there a way to delete the processed topics via streams or the java
> >>> driver? Or only thru the bash script?
> >>>
> >>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" 
> >> wrote:
> >>>
>  If you keep old topics that are completely processed, there would be
>  increasing overhead, because Streams would try to read from those
> topics
>  as long as they exist. Thus, more fetch request will be sent to those
>  more topics over time, while most fetch request will return without
> any
>  new data (as those old topic do not have new data)
> 
>  If you delete completely processed topics, there will be no overhead.
> 
>  -Matthias
> 
>  On 12/2/16 3:58 PM, Ali Akhtar wrote:
> > Hey Matthias,
> >
> > So I have a scenario where I need to batch a group of messages
> >> together.
> >
> > I'm considering creating a new topic for each batch that arrives, i.e
> > batch_.
> >
> > Each batch_ topic will have a finite number of messages, and then
> >> it
> > will remain empty. Essentially these will be throwaway topics.
> >
> > Is there any overhead to there being a lot of these topics, and
> having
> >> a
> > listener for batch_.* , or is this effectively like having one
> listener
>  for
> > one topic?
> >
> > On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> > wrote:
> >
> >> 1) There will be once consumer per thread. The number of thread is
> >> defined by the number of instances you start and how many threads
> you
> >> configure for each instance via StreamConfig parameter
> >> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
>  yourself.
> >>
> >> Depending on the number to partitions in your topics, each thread
> will
> >> process one or multiple partitions. As a partition will be processed
> >> by
> >> exactly one thread, the overall number of partitions over all you
> >> input
> >> topics limits your max number of thread (if you have more threads,
> >> those
> >> will just be idle)
> >>
> >> 2) Thus, there should be no performance issues. Furthermore, if you
> >> create new topic while you application is running -- and if this
> might
> >> overload you current application -- you can always start new
> instances
> >> an scale-out you application dynamically -- Kafka Streams is fully
>  elastic.
> >>
> >> Have a look here for more details:
> >> http://docs.confluent.io/current/streams/architecture.html
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/16 6:23 AM, Ali Akhtar wrote:
> >>> That's pretty useful to know - thanks.
> >>>
> >>> 1) If I listened too foo-.*, and there were 5 foo topics created
> >> after
> >>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will
> this
> >> create
> >>> 5 consumers / threads / instances, or will it be just 1 instance
> that
> >>> receives the messages for all of those topics?
> >>>
> >>> 2) Will this cause issues performance issues if i had a lot of
>  throwaway
> >>> foo topics being created, or will this scale?
> >>>
> >>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy 
>  wrote:
> >>>
>  Hi Ali,
> 
>  The only way KafkaStreams will process new topics after start is
> if
>  the
>  original stream was defined with a regular expression, i.e,
>  kafka.stream(Pattern.compile("foo-.*");
> 
>  If any new 

Re: Adding topics to KafkaStreams after ingestion has been started?

2016-12-03 Thread Matthias J. Sax
I guess yes. You might only want to make sure the topic offsets got
committed -- not sure if committing offsets of a deleted topic could
cause issue (ie, crashing you Streams app)

-Matthias

On 12/2/16 11:04 PM, Ali Akhtar wrote:
> Thank you very much. Last q - Is it safe to do this from within a call back
> processing that topic ,  once it reaches the last message? (It keeps a
> count of how many messages processed vs how many remaining)
> 
> On 3 Dec 2016 11:36 a.m., "Matthias J. Sax"  wrote:
> 
>> You can use TopicCommand to delete a topic within Java:
>>
>>> final TopicCommand.TopicCommandOptions commandOptions = new
>> TopicCommand.TopicCommandOptions(new String[]{
>>> "--zookeeper", "zookeperHost:2181",
>>> "--delete",
>>> "--topic", "TOPIC-TO-BE-DELETED"});
>>> TopicCommand.deleteTopic(zkUtils, commandOptions);
>>
>> So you can delete a topic within your Streams app.
>>
>> -Matthias
>>
>>
>>
>> On 12/2/16 9:25 PM, Ali Akhtar wrote:
>>> Is there a way to delete the processed topics via streams or the java
>>> driver? Or only thru the bash script?
>>>
>>> On 3 Dec 2016 5:27 a.m., "Matthias J. Sax" 
>> wrote:
>>>
 If you keep old topics that are completely processed, there would be
 increasing overhead, because Streams would try to read from those topics
 as long as they exist. Thus, more fetch request will be sent to those
 more topics over time, while most fetch request will return without any
 new data (as those old topic do not have new data)

 If you delete completely processed topics, there will be no overhead.

 -Matthias

 On 12/2/16 3:58 PM, Ali Akhtar wrote:
> Hey Matthias,
>
> So I have a scenario where I need to batch a group of messages
>> together.
>
> I'm considering creating a new topic for each batch that arrives, i.e
> batch_.
>
> Each batch_ topic will have a finite number of messages, and then
>> it
> will remain empty. Essentially these will be throwaway topics.
>
> Is there any overhead to there being a lot of these topics, and having
>> a
> listener for batch_.* , or is this effectively like having one listener
 for
> one topic?
>
> On Fri, Dec 2, 2016 at 11:09 PM, Matthias J. Sax <
>> matth...@confluent.io>
> wrote:
>
>> 1) There will be once consumer per thread. The number of thread is
>> defined by the number of instances you start and how many threads you
>> configure for each instance via StreamConfig parameter
>> NUM_STREAM_THREADS_CONFIG. Thus, you control this completely by
 yourself.
>>
>> Depending on the number to partitions in your topics, each thread will
>> process one or multiple partitions. As a partition will be processed
>> by
>> exactly one thread, the overall number of partitions over all you
>> input
>> topics limits your max number of thread (if you have more threads,
>> those
>> will just be idle)
>>
>> 2) Thus, there should be no performance issues. Furthermore, if you
>> create new topic while you application is running -- and if this might
>> overload you current application -- you can always start new instances
>> an scale-out you application dynamically -- Kafka Streams is fully
 elastic.
>>
>> Have a look here for more details:
>> http://docs.confluent.io/current/streams/architecture.html
>>
>>
>> -Matthias
>>
>> On 12/2/16 6:23 AM, Ali Akhtar wrote:
>>> That's pretty useful to know - thanks.
>>>
>>> 1) If I listened too foo-.*, and there were 5 foo topics created
>> after
>>> kafka streaming was running: foo1, foo2, foo3, foo4, foo5, will this
>> create
>>> 5 consumers / threads / instances, or will it be just 1 instance that
>>> receives the messages for all of those topics?
>>>
>>> 2) Will this cause issues performance issues if i had a lot of
 throwaway
>>> foo topics being created, or will this scale?
>>>
>>> On Fri, Dec 2, 2016 at 7:17 PM, Damian Guy 
 wrote:
>>>
 Hi Ali,

 The only way KafkaStreams will process new topics after start is if
 the
 original stream was defined with a regular expression, i.e,
 kafka.stream(Pattern.compile("foo-.*");

 If any new topics are added after start that match the pattern, then
>> they
 will also be consumed.

 Thanks,
 Damian

 On Fri, 2 Dec 2016 at 13:13 Ali Akhtar 
>> wrote:

> Heya,
>
> Normally, you add your topics and their callbacks to a
>> StreamBuilder,
>> and
> then call KafkaStreams.start() to start ingesting those topics.
>
> Is it possible to add a new topic to the StreamBuilder, and start
 ingesting
> that as well, after