Truncation

2019-07-18 Thread Jamie
Hi All,
Could someone confirm what truncation happens when a partition changes from a 
follower to a leader and why? 
Any help would be greatly appreciated.
Many Thanks,
Jamie

Check the version of kafka information for MDT Mirror information

2019-07-18 Thread Harry k
Hi,
How to check version information for Kafka that is being used on the MDT
Mirror servers? Is their any command to check that.I have any only access
to Kafka and zookeeper servers through putty.Any help will be appreciated.
Best Regards,
bahir


Streams - Low level API

2019-07-18 Thread Navneeth Krishnan
Hi All,

I'm working on a kafka streams application to basically aggreagte data over
5 min intervals.

The input topic is partitioned by id and then I want to use process
function to aggregate data using state store and use punctuator to emit
results. But I'm not sure how I can perform groupBy when using low level
api's. My intention is to achieve the below topology.

Topology: Source -> Map values -> GroupBy (internal key) -> Process -> Sink

Need some help on how I can achieve this. Appreciate all the help.

Thanks


Re: Print RocksDb Stats

2019-07-18 Thread Guozhang Wang
Hello Muhammed,

The community is working on KIP-444 that expose rocksDB metrics. There's an
on-going PR that you may find helpful for your own implementation:
https://github.com/apache/kafka/pull/6884


Guozhang


On Wed, Jul 17, 2019 at 6:26 AM Muhammed Ashik  wrote:

> Hi I'm trying to log the rocksdb stats with the below code, but not
> observing any logs..
> I'm enabling this as the off-heap memory grows indefinitely over a
> period of time.
> We were using inMemoryKeyValueStore only, I was not sure kafka-streams uses
> rockdb as default in memory store.
>
> Kafka Streams version - 2.0.0
>
> class CustomRocksDBConfig extends RocksDBConfigSetter {
>   override def setConfig(storeName: String, options: Options, configs:
> util.Map[String, AnyRef]): Unit = {
>
> val stats = new Statistics
> stats.setStatsLevel(StatsLevel.ALL)
> options.setStatistics(stats)
>   .setStatsDumpPeriodSec(600)
> options
>   .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
> options.setDbLogDir("/tmp/dump")
>
>   }
> }
>


-- 
-- Guozhang


Re: Kafka Streams - unbounded memory growth

2019-07-18 Thread Sophie Blee-Goldman
And all four stores (BucketData, CacheData, StationKeyValue,
StationCacheData) are definitely explicitly added as
Stores.inMemoryKeyValueStore("name")? Hm.

Does du -h show any contents within the
./buzzard.MonitoringSeWlanStatsAggregator/0_1 directories that match any of
these store names? Also if you rerun the app with a different state.dir, do
all the same directories get created?

On Thu, Jul 18, 2019 at 12:15 AM Muhammed Ashik  wrote:

> Hi, my Topology description..
>
> - Topologies:
>Sub-topology: 0
> Source: SeSource (topics: [buzzard.decoded.amon.stats.ap,
> buzzard.app.monitoring.internal.clientsestats])
>   --> SeProcess
> Processor: SeProcess (stores: [BucketData, CacheData])
>   --> AeSink
>   <-- SeSource
> Sink: AeSink (topic: buzzard.app.monitoring.internal.stats)
>   <-- SeProcess
>
>   Sub-topology: 1
> Source: StationSource (topics: [buzzard.decoded.amon.stats.station])
>   --> StationProcess
> Processor: StationProcess (stores: [StationKeyValue, StationCacheData])
>   --> StationSink
>   <-- StationSource
> Sink: StationSink (topic:
> buzzard.app.monitoring.internal.clientsestats)
>   <-- StationProcess
>
> Regards
> Ashik
>
> On Thu, Jul 18, 2019 at 1:31 AM Sophie Blee-Goldman 
> wrote:
>
> > Hm. These directories shouldn't be created if using only an in memory
> > store. Can you print your topology?
> >
> > On Wed, Jul 17, 2019 at 11:02 AM Muhammed Ashik 
> wrote:
> >
> > > Hi I just did `du -mh` on `\tmp\kafka-streams` below are the folders
> > listed
> > > with some .lock files inside.
> > > not sure if these are coming from rocksdb.. and looks like the sizes of
> > > these files are less.
> > >
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_5
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_3
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_2
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/1_4
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_1
> > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_4
> > > 28K ./buzzard.MonitoringSeWlanStatsAggregator
> > > 32K .
> > >
> > >
> > >
> > > On Wed, Jul 17, 2019 at 11:18 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > wrote:
> > >
> > > > Hm. Just to be absolutely sure, could you try throwing an exception
> or
> > > > something in your RocksDBConfigSetter?
> > > >
> > > > On Wed, Jul 17, 2019 at 10:43 AM Muhammed Ashik 
> > > wrote:
> > > >
> > > > > I can confirm the /tmp/kafka-streams doesn't have any data related
> to
> > > > > rocksdb.
> > > > >
> > > > > On Wed, Jul 17, 2019 at 11:11 PM Sophie Blee-Goldman <
> > > > sop...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > You can describe your topology to see if there are any state
> stores
> > > in
> > > > it
> > > > > > that you aren't aware of. Alternatively you could check out the
> > state
> > > > > > directory (/tmp/kafka-streams by default) and see if there is any
> > > data
> > > > in
> > > > > > there
> > > > > >
> > > > > > On Wed, Jul 17, 2019 at 10:36 AM Muhammed Ashik <
> ashi...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Thanks How can I verify If there is some data really going on
> > > rocksdb
> > > > > > > I tried printing the statistics with no success.
> > > > > > >
> > > > > > > class CustomRocksDBConfig extends RocksDBConfigSetter {
> > > > > > >   override def setConfig(storeName: String, options: Options,
> > > > configs:
> > > > > > > util.Map[String, AnyRef]): Unit = {
> > > > > > >
> > > > > > > val stats = new Statistics
> > > > > > > stats.setStatsLevel(StatsLevel.ALL)
> > > > > > > options.setStatistics(stats)
> > > > > > >   .setStatsDumpPeriodSec(600)
> > > > > > > options
> > > > > > >   .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
> > > > > > > options.setDbLogDir("/tmp/dump")
> > > > > > >
> > > > > > >   }
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > and included in the stream config ..
> > > > > > >
> > > > > > > settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > > > > > > classOf[CustomRocksDBConfig])
> > > > > > >
> > > > > > >
> > > > > > > Regards
> > > > > > > Ashik
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 17, 2019 at 10:52 PM Sophie Blee-Goldman <
> > > > > > sop...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Sorry, didn't see the "off-heap" part of the email. Are you
> > using
> > > > any
> > > > > > > > stateful DSL operators? The default stores are persistent, so
> > you
> > > > may
> > > > > > > have
> > > > > > > > a RocksDB store in your topology without explicitly using
> one.
> > > > > > > >
> > > > > > > > On Wed, Jul 17, 2019 at 10:12 AM Sophie Blee-Goldman <
> > > > > > > sop...@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > If you are using inMemoryKeyValueStore, the records are
> > stored
> > > by
> > > > >

check the version in putty

2019-07-18 Thread Harry k
Hi,
how to check the version information for Kafka that is being used on the
MDT Mirror servers.
thanks


Re: Kafka logs are getting deleted too soon

2019-07-18 Thread Sachin Nikumbh
 Hi Peter,
A HUGE thank you for your suggestion of using ‘retention.ms=-1' for the topic. 
I also explicitly set ‘retention.bytes=-1'. With this combination, kafka is 
certainly not deleting the segment logs and I am able to run multiple instances 
of console consumers to read data. I am still confused about the behavior that 
I see when I set 'retention.ms=3144960' and 'retention.bytes=10737418240'. 
I would love to understand why Kafka is deleting messages.
I am noticing that each segment is ~ 100MB in size when the default value for 
'segment.bytes' is ~ 1GB. Another example of Kafka not behaving according to 
what's documented.
Thanks once again for your suggestion.
Sachin

On Wednesday, July 17, 2019, 10:46:13 PM EDT, Peter Bukowinski 
 wrote:  
 
 Indeed, something seems wrong. I have a kafka (2.0.1) cluster that aggregates 
data from multiple locations. It has so much data moving through it I can’t 
afford to keep more than 24 hours on disk. The retention is working correctly. 
I don’t restrict topics by size, only by time.

What version of kafka are you using?

Looking back at the example log directory listing, I see that you mentioned 
seeing .log.deleted files. Yes, that means kafka tagged that log segment 
for deletion, and then the cleanup process removed it soon after. Something is 
causing your data to be cleaned, despite your retention overrides.

Can you try removing 'retention.bytes’ and setting ‘retention.ms=-1' for the 
topic? That should persist the data indefinitely.



> On Jul 17, 2019, at 6:07 PM, Sachin Nikumbh  
> wrote:
> 
> I am not setting the group id for the console consumer. When I say, the .log 
> files are all 0 bytes long it is after the producer has gone through 96 GB 
> worth of data. Apart from this topic where I am dumping 96GB of data, I have 
> some test topics where I am publishing very small amount of data. I don't 
> have any problem reading messages from those topics. The .log files for those 
> topics are properly sized and I can read those messages using multiple 
> console consumers at the same time. I have a feeling that the this specific 
> topic is having trouble due to the amount of data that I am publishing. I am 
> failing to understand which Kafka settings are playing role here.
> I am sure 96GB of data is really not a big deal for Kafka and I am not the 
> first one to do this.
>    On Wednesday, July 17, 2019, 04:58:48 PM EDT, Peter Bukowinski 
> wrote:  
> 
> Are you setting a group.id for your console consumer, perhaps, and keeping it 
> static? That would explain the inability to reconsume the data. As to why 
> your logs look empty, kafka likes to hold the data in memory and leaves it to 
> the OS to flush the data to disk. On a non-busy broker, the interval between 
> when data arrives and when it is flushed to disk can be quite a while.
> 
> 
>> On Jul 17, 2019, at 1:39 PM, Sachin Nikumbh  
>> wrote:
>> 
>> Hi Jamie,
>> I have 3 brokers and the replication factor for my topic is set to 3. I know 
>> for sure that the producer is producing data successfully because I am 
>> running a console consumer at the same time and it shows me the messages. 
>> After the producer produces all the data, I have /var/log/kafka/myTopic-* 
>> directories (15 of them) and all of them have only one .log file with size 
>> of 0 bytes. So, I am not sure if that addresses your question around the 
>> active segment.
>> ThanksSachin
>>    On Wednesday, July 17, 2019, 04:00:56 PM EDT, Jamie 
>> wrote:  
>> 
>> Hi Sachin, 
>> My understanding is that the active segment is never deleted which means you 
>> should have at least 1GB of data in your partition, if the data is indeed 
>> being produced to Kafka, Are there are errors in your broker logs? How many 
>> brokers do you have have and what is the replication factor of the topic? If 
>> you have less than 3 brokers, have you set offsets.topic.replication.factor 
>> to the number of brokers? 
>> 
>> Thanks, 
>> Jamie
>> 
>> -Original Message-
>> From: Sachin Nikumbh 
>> To: users 
>> Sent: Wed, 17 Jul 2019 20:21
>> Subject: Re: Kafka logs are getting deleted too soon
>> 
>> Broker 
>> configs:===broker.id=36num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafkanum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=30zookeeper.connect=myserver1:2181,myserver2:2181,myserver3:2181zookeeper.connection.timeout.ms=6000confluent.support.metrics.enable=trueconfluent.support.customer.id=anonymousgroup.initial.rebalance.delay.ms=0auto.create.topics.enable=false
>> Topic configs:==--partitions 15--replication-factor 
>> 3retention.ms=3144960retention.bytes=10737418240
>> As you can see, I have tried

Re: Best practices for compacting topics with tombstones

2019-07-18 Thread Omar Al-Safi
If I recall correctly, you can set 'delete.retention.ms' in the topic level
configuration to how long you want to retain the tombstones in the topic.
By default is set to 8640, you can set it to lower than this. Regarding
the performance, I am not really why would compaction causes the
performance hit to your broker, but the question would be how much data you
hold there, how often you have updates to your topic (records with the same
key) and how often you have tombstones for records

On Wed, 17 Jul 2019 at 22:12, Chris Baumgartner <
chris.baumgart...@fujifilm.com> wrote:

> Hello,
>
> I'm wondering if anyone has advice on configuring compaction. Here is my
> scenario:
>
> A producer writes raw data to topic #1. A stream app reads the data from
> topic #1, processes it, writes the processed data to topic #2, and then
> writes a tombstone record to topic #1.
>
> So, I don't intend for data to be retained very long in topic #1.
>
> Are there any best practices for configuring compaction on topic #1 in this
> case? I don't want to keep the data around very long after it has been
> processed, but I also don't want to cause performance issues by compacting
> too often.
>
> Thanks.
>
> - Chris
>
> --
> NOTICE:  This message, including any attachments, is only for the use of
> the intended recipient(s) and may contain confidential, sensitive and/or
> privileged information, or information otherwise prohibited from
> dissemination or disclosure by law or regulation, including applicable
> export regulations.  If the reader of this message is not the intended
> recipient, you are hereby notified that any use, disclosure, copying,
> dissemination or distribution of this message or any of its attachments is
> strictly prohibited.  If you received this message in error, please
> contact
> the sender immediately by reply email and destroy this message, including
> all attachments, and any copies thereof.
>


Re: Kafka Streams - unbounded memory growth

2019-07-18 Thread Muhammed Ashik
Hi, my Topology description..

- Topologies:
   Sub-topology: 0
Source: SeSource (topics: [buzzard.decoded.amon.stats.ap,
buzzard.app.monitoring.internal.clientsestats])
  --> SeProcess
Processor: SeProcess (stores: [BucketData, CacheData])
  --> AeSink
  <-- SeSource
Sink: AeSink (topic: buzzard.app.monitoring.internal.stats)
  <-- SeProcess

  Sub-topology: 1
Source: StationSource (topics: [buzzard.decoded.amon.stats.station])
  --> StationProcess
Processor: StationProcess (stores: [StationKeyValue, StationCacheData])
  --> StationSink
  <-- StationSource
Sink: StationSink (topic: buzzard.app.monitoring.internal.clientsestats)
  <-- StationProcess

Regards
Ashik

On Thu, Jul 18, 2019 at 1:31 AM Sophie Blee-Goldman 
wrote:

> Hm. These directories shouldn't be created if using only an in memory
> store. Can you print your topology?
>
> On Wed, Jul 17, 2019 at 11:02 AM Muhammed Ashik  wrote:
>
> > Hi I just did `du -mh` on `\tmp\kafka-streams` below are the folders
> listed
> > with some .lock files inside.
> > not sure if these are coming from rocksdb.. and looks like the sizes of
> > these files are less.
> >
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_5
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_3
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_2
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/1_4
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_1
> > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_4
> > 28K ./buzzard.MonitoringSeWlanStatsAggregator
> > 32K .
> >
> >
> >
> > On Wed, Jul 17, 2019 at 11:18 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> > > Hm. Just to be absolutely sure, could you try throwing an exception or
> > > something in your RocksDBConfigSetter?
> > >
> > > On Wed, Jul 17, 2019 at 10:43 AM Muhammed Ashik 
> > wrote:
> > >
> > > > I can confirm the /tmp/kafka-streams doesn't have any data related to
> > > > rocksdb.
> > > >
> > > > On Wed, Jul 17, 2019 at 11:11 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > wrote:
> > > >
> > > > > You can describe your topology to see if there are any state stores
> > in
> > > it
> > > > > that you aren't aware of. Alternatively you could check out the
> state
> > > > > directory (/tmp/kafka-streams by default) and see if there is any
> > data
> > > in
> > > > > there
> > > > >
> > > > > On Wed, Jul 17, 2019 at 10:36 AM Muhammed Ashik  >
> > > > wrote:
> > > > >
> > > > > > Thanks How can I verify If there is some data really going on
> > rocksdb
> > > > > > I tried printing the statistics with no success.
> > > > > >
> > > > > > class CustomRocksDBConfig extends RocksDBConfigSetter {
> > > > > >   override def setConfig(storeName: String, options: Options,
> > > configs:
> > > > > > util.Map[String, AnyRef]): Unit = {
> > > > > >
> > > > > > val stats = new Statistics
> > > > > > stats.setStatsLevel(StatsLevel.ALL)
> > > > > > options.setStatistics(stats)
> > > > > >   .setStatsDumpPeriodSec(600)
> > > > > > options
> > > > > >   .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
> > > > > > options.setDbLogDir("/tmp/dump")
> > > > > >
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > >
> > > > > > and included in the stream config ..
> > > > > >
> > > > > > settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > > > > > classOf[CustomRocksDBConfig])
> > > > > >
> > > > > >
> > > > > > Regards
> > > > > > Ashik
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 17, 2019 at 10:52 PM Sophie Blee-Goldman <
> > > > > sop...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Sorry, didn't see the "off-heap" part of the email. Are you
> using
> > > any
> > > > > > > stateful DSL operators? The default stores are persistent, so
> you
> > > may
> > > > > > have
> > > > > > > a RocksDB store in your topology without explicitly using one.
> > > > > > >
> > > > > > > On Wed, Jul 17, 2019 at 10:12 AM Sophie Blee-Goldman <
> > > > > > sop...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > If you are using inMemoryKeyValueStore, the records are
> stored
> > by
> > > > > > > > definition in memory. RocksDB is not used at all. This store
> > will
> > > > > > > continue
> > > > > > > > to grow proportionally to your keyspace. If you do not have
> > > > > sufficient
> > > > > > > > memory to hold your entire dataset in memory, consider adding
> > > > another
> > > > > > > > instance or switching to the RocksDB store
> > > > > > > >
> > > > > > > > On Wed, Jul 17, 2019 at 6:22 AM Muhammed Ashik <
> > > ashi...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Kafka Streams version - 2.0.0
> > > > > > > >>
> > > > > > > >> Hi, in our streaming instance we are observing a steady
> growth
> > > in
> > > > > the
> > > > > > > >> off-heap memory (out of 2gb allocated memory 1.3 is reserved
> > for
> > >