Truncation
Hi All, Could someone confirm what truncation happens when a partition changes from a follower to a leader and why? Any help would be greatly appreciated. Many Thanks, Jamie
Check the version of kafka information for MDT Mirror information
Hi, How to check version information for Kafka that is being used on the MDT Mirror servers? Is their any command to check that.I have any only access to Kafka and zookeeper servers through putty.Any help will be appreciated. Best Regards, bahir
Streams - Low level API
Hi All, I'm working on a kafka streams application to basically aggreagte data over 5 min intervals. The input topic is partitioned by id and then I want to use process function to aggregate data using state store and use punctuator to emit results. But I'm not sure how I can perform groupBy when using low level api's. My intention is to achieve the below topology. Topology: Source -> Map values -> GroupBy (internal key) -> Process -> Sink Need some help on how I can achieve this. Appreciate all the help. Thanks
Re: Print RocksDb Stats
Hello Muhammed, The community is working on KIP-444 that expose rocksDB metrics. There's an on-going PR that you may find helpful for your own implementation: https://github.com/apache/kafka/pull/6884 Guozhang On Wed, Jul 17, 2019 at 6:26 AM Muhammed Ashik wrote: > Hi I'm trying to log the rocksdb stats with the below code, but not > observing any logs.. > I'm enabling this as the off-heap memory grows indefinitely over a > period of time. > We were using inMemoryKeyValueStore only, I was not sure kafka-streams uses > rockdb as default in memory store. > > Kafka Streams version - 2.0.0 > > class CustomRocksDBConfig extends RocksDBConfigSetter { > override def setConfig(storeName: String, options: Options, configs: > util.Map[String, AnyRef]): Unit = { > > val stats = new Statistics > stats.setStatsLevel(StatsLevel.ALL) > options.setStatistics(stats) > .setStatsDumpPeriodSec(600) > options > .setInfoLogLevel(InfoLogLevel.INFO_LEVEL) > options.setDbLogDir("/tmp/dump") > > } > } > -- -- Guozhang
Re: Kafka Streams - unbounded memory growth
And all four stores (BucketData, CacheData, StationKeyValue, StationCacheData) are definitely explicitly added as Stores.inMemoryKeyValueStore("name")? Hm. Does du -h show any contents within the ./buzzard.MonitoringSeWlanStatsAggregator/0_1 directories that match any of these store names? Also if you rerun the app with a different state.dir, do all the same directories get created? On Thu, Jul 18, 2019 at 12:15 AM Muhammed Ashik wrote: > Hi, my Topology description.. > > - Topologies: >Sub-topology: 0 > Source: SeSource (topics: [buzzard.decoded.amon.stats.ap, > buzzard.app.monitoring.internal.clientsestats]) > --> SeProcess > Processor: SeProcess (stores: [BucketData, CacheData]) > --> AeSink > <-- SeSource > Sink: AeSink (topic: buzzard.app.monitoring.internal.stats) > <-- SeProcess > > Sub-topology: 1 > Source: StationSource (topics: [buzzard.decoded.amon.stats.station]) > --> StationProcess > Processor: StationProcess (stores: [StationKeyValue, StationCacheData]) > --> StationSink > <-- StationSource > Sink: StationSink (topic: > buzzard.app.monitoring.internal.clientsestats) > <-- StationProcess > > Regards > Ashik > > On Thu, Jul 18, 2019 at 1:31 AM Sophie Blee-Goldman > wrote: > > > Hm. These directories shouldn't be created if using only an in memory > > store. Can you print your topology? > > > > On Wed, Jul 17, 2019 at 11:02 AM Muhammed Ashik > wrote: > > > > > Hi I just did `du -mh` on `\tmp\kafka-streams` below are the folders > > listed > > > with some .lock files inside. > > > not sure if these are coming from rocksdb.. and looks like the sizes of > > > these files are less. > > > > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_5 > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_3 > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_2 > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/1_4 > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_1 > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_4 > > > 28K ./buzzard.MonitoringSeWlanStatsAggregator > > > 32K . > > > > > > > > > > > > On Wed, Jul 17, 2019 at 11:18 PM Sophie Blee-Goldman < > > sop...@confluent.io> > > > wrote: > > > > > > > Hm. Just to be absolutely sure, could you try throwing an exception > or > > > > something in your RocksDBConfigSetter? > > > > > > > > On Wed, Jul 17, 2019 at 10:43 AM Muhammed Ashik > > > wrote: > > > > > > > > > I can confirm the /tmp/kafka-streams doesn't have any data related > to > > > > > rocksdb. > > > > > > > > > > On Wed, Jul 17, 2019 at 11:11 PM Sophie Blee-Goldman < > > > > sop...@confluent.io> > > > > > wrote: > > > > > > > > > > > You can describe your topology to see if there are any state > stores > > > in > > > > it > > > > > > that you aren't aware of. Alternatively you could check out the > > state > > > > > > directory (/tmp/kafka-streams by default) and see if there is any > > > data > > > > in > > > > > > there > > > > > > > > > > > > On Wed, Jul 17, 2019 at 10:36 AM Muhammed Ashik < > ashi...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Thanks How can I verify If there is some data really going on > > > rocksdb > > > > > > > I tried printing the statistics with no success. > > > > > > > > > > > > > > class CustomRocksDBConfig extends RocksDBConfigSetter { > > > > > > > override def setConfig(storeName: String, options: Options, > > > > configs: > > > > > > > util.Map[String, AnyRef]): Unit = { > > > > > > > > > > > > > > val stats = new Statistics > > > > > > > stats.setStatsLevel(StatsLevel.ALL) > > > > > > > options.setStatistics(stats) > > > > > > > .setStatsDumpPeriodSec(600) > > > > > > > options > > > > > > > .setInfoLogLevel(InfoLogLevel.INFO_LEVEL) > > > > > > > options.setDbLogDir("/tmp/dump") > > > > > > > > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > > > > > > > > and included in the stream config .. > > > > > > > > > > > > > > settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > > > > > > classOf[CustomRocksDBConfig]) > > > > > > > > > > > > > > > > > > > > > Regards > > > > > > > Ashik > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 17, 2019 at 10:52 PM Sophie Blee-Goldman < > > > > > > sop...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Sorry, didn't see the "off-heap" part of the email. Are you > > using > > > > any > > > > > > > > stateful DSL operators? The default stores are persistent, so > > you > > > > may > > > > > > > have > > > > > > > > a RocksDB store in your topology without explicitly using > one. > > > > > > > > > > > > > > > > On Wed, Jul 17, 2019 at 10:12 AM Sophie Blee-Goldman < > > > > > > > sop...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > If you are using inMemoryKeyValueStore, the records are > > stored > > > by > > > > >
check the version in putty
Hi, how to check the version information for Kafka that is being used on the MDT Mirror servers. thanks
Re: Kafka logs are getting deleted too soon
Hi Peter, A HUGE thank you for your suggestion of using ‘retention.ms=-1' for the topic. I also explicitly set ‘retention.bytes=-1'. With this combination, kafka is certainly not deleting the segment logs and I am able to run multiple instances of console consumers to read data. I am still confused about the behavior that I see when I set 'retention.ms=3144960' and 'retention.bytes=10737418240'. I would love to understand why Kafka is deleting messages. I am noticing that each segment is ~ 100MB in size when the default value for 'segment.bytes' is ~ 1GB. Another example of Kafka not behaving according to what's documented. Thanks once again for your suggestion. Sachin On Wednesday, July 17, 2019, 10:46:13 PM EDT, Peter Bukowinski wrote: Indeed, something seems wrong. I have a kafka (2.0.1) cluster that aggregates data from multiple locations. It has so much data moving through it I can’t afford to keep more than 24 hours on disk. The retention is working correctly. I don’t restrict topics by size, only by time. What version of kafka are you using? Looking back at the example log directory listing, I see that you mentioned seeing .log.deleted files. Yes, that means kafka tagged that log segment for deletion, and then the cleanup process removed it soon after. Something is causing your data to be cleaned, despite your retention overrides. Can you try removing 'retention.bytes’ and setting ‘retention.ms=-1' for the topic? That should persist the data indefinitely. > On Jul 17, 2019, at 6:07 PM, Sachin Nikumbh > wrote: > > I am not setting the group id for the console consumer. When I say, the .log > files are all 0 bytes long it is after the producer has gone through 96 GB > worth of data. Apart from this topic where I am dumping 96GB of data, I have > some test topics where I am publishing very small amount of data. I don't > have any problem reading messages from those topics. The .log files for those > topics are properly sized and I can read those messages using multiple > console consumers at the same time. I have a feeling that the this specific > topic is having trouble due to the amount of data that I am publishing. I am > failing to understand which Kafka settings are playing role here. > I am sure 96GB of data is really not a big deal for Kafka and I am not the > first one to do this. > On Wednesday, July 17, 2019, 04:58:48 PM EDT, Peter Bukowinski > wrote: > > Are you setting a group.id for your console consumer, perhaps, and keeping it > static? That would explain the inability to reconsume the data. As to why > your logs look empty, kafka likes to hold the data in memory and leaves it to > the OS to flush the data to disk. On a non-busy broker, the interval between > when data arrives and when it is flushed to disk can be quite a while. > > >> On Jul 17, 2019, at 1:39 PM, Sachin Nikumbh >> wrote: >> >> Hi Jamie, >> I have 3 brokers and the replication factor for my topic is set to 3. I know >> for sure that the producer is producing data successfully because I am >> running a console consumer at the same time and it shows me the messages. >> After the producer produces all the data, I have /var/log/kafka/myTopic-* >> directories (15 of them) and all of them have only one .log file with size >> of 0 bytes. So, I am not sure if that addresses your question around the >> active segment. >> ThanksSachin >> On Wednesday, July 17, 2019, 04:00:56 PM EDT, Jamie >> wrote: >> >> Hi Sachin, >> My understanding is that the active segment is never deleted which means you >> should have at least 1GB of data in your partition, if the data is indeed >> being produced to Kafka, Are there are errors in your broker logs? How many >> brokers do you have have and what is the replication factor of the topic? If >> you have less than 3 brokers, have you set offsets.topic.replication.factor >> to the number of brokers? >> >> Thanks, >> Jamie >> >> -Original Message- >> From: Sachin Nikumbh >> To: users >> Sent: Wed, 17 Jul 2019 20:21 >> Subject: Re: Kafka logs are getting deleted too soon >> >> Broker >> configs:===broker.id=36num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/var/log/kafkanum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=30zookeeper.connect=myserver1:2181,myserver2:2181,myserver3:2181zookeeper.connection.timeout.ms=6000confluent.support.metrics.enable=trueconfluent.support.customer.id=anonymousgroup.initial.rebalance.delay.ms=0auto.create.topics.enable=false >> Topic configs:==--partitions 15--replication-factor >> 3retention.ms=3144960retention.bytes=10737418240 >> As you can see, I have tried
Re: Best practices for compacting topics with tombstones
If I recall correctly, you can set 'delete.retention.ms' in the topic level configuration to how long you want to retain the tombstones in the topic. By default is set to 8640, you can set it to lower than this. Regarding the performance, I am not really why would compaction causes the performance hit to your broker, but the question would be how much data you hold there, how often you have updates to your topic (records with the same key) and how often you have tombstones for records On Wed, 17 Jul 2019 at 22:12, Chris Baumgartner < chris.baumgart...@fujifilm.com> wrote: > Hello, > > I'm wondering if anyone has advice on configuring compaction. Here is my > scenario: > > A producer writes raw data to topic #1. A stream app reads the data from > topic #1, processes it, writes the processed data to topic #2, and then > writes a tombstone record to topic #1. > > So, I don't intend for data to be retained very long in topic #1. > > Are there any best practices for configuring compaction on topic #1 in this > case? I don't want to keep the data around very long after it has been > processed, but I also don't want to cause performance issues by compacting > too often. > > Thanks. > > - Chris > > -- > NOTICE: This message, including any attachments, is only for the use of > the intended recipient(s) and may contain confidential, sensitive and/or > privileged information, or information otherwise prohibited from > dissemination or disclosure by law or regulation, including applicable > export regulations. If the reader of this message is not the intended > recipient, you are hereby notified that any use, disclosure, copying, > dissemination or distribution of this message or any of its attachments is > strictly prohibited. If you received this message in error, please > contact > the sender immediately by reply email and destroy this message, including > all attachments, and any copies thereof. >
Re: Kafka Streams - unbounded memory growth
Hi, my Topology description.. - Topologies: Sub-topology: 0 Source: SeSource (topics: [buzzard.decoded.amon.stats.ap, buzzard.app.monitoring.internal.clientsestats]) --> SeProcess Processor: SeProcess (stores: [BucketData, CacheData]) --> AeSink <-- SeSource Sink: AeSink (topic: buzzard.app.monitoring.internal.stats) <-- SeProcess Sub-topology: 1 Source: StationSource (topics: [buzzard.decoded.amon.stats.station]) --> StationProcess Processor: StationProcess (stores: [StationKeyValue, StationCacheData]) --> StationSink <-- StationSource Sink: StationSink (topic: buzzard.app.monitoring.internal.clientsestats) <-- StationProcess Regards Ashik On Thu, Jul 18, 2019 at 1:31 AM Sophie Blee-Goldman wrote: > Hm. These directories shouldn't be created if using only an in memory > store. Can you print your topology? > > On Wed, Jul 17, 2019 at 11:02 AM Muhammed Ashik wrote: > > > Hi I just did `du -mh` on `\tmp\kafka-streams` below are the folders > listed > > with some .lock files inside. > > not sure if these are coming from rocksdb.. and looks like the sizes of > > these files are less. > > > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_5 > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_3 > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_2 > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/1_4 > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_1 > > 4.0K ./buzzard.MonitoringSeWlanStatsAggregator/0_4 > > 28K ./buzzard.MonitoringSeWlanStatsAggregator > > 32K . > > > > > > > > On Wed, Jul 17, 2019 at 11:18 PM Sophie Blee-Goldman < > sop...@confluent.io> > > wrote: > > > > > Hm. Just to be absolutely sure, could you try throwing an exception or > > > something in your RocksDBConfigSetter? > > > > > > On Wed, Jul 17, 2019 at 10:43 AM Muhammed Ashik > > wrote: > > > > > > > I can confirm the /tmp/kafka-streams doesn't have any data related to > > > > rocksdb. > > > > > > > > On Wed, Jul 17, 2019 at 11:11 PM Sophie Blee-Goldman < > > > sop...@confluent.io> > > > > wrote: > > > > > > > > > You can describe your topology to see if there are any state stores > > in > > > it > > > > > that you aren't aware of. Alternatively you could check out the > state > > > > > directory (/tmp/kafka-streams by default) and see if there is any > > data > > > in > > > > > there > > > > > > > > > > On Wed, Jul 17, 2019 at 10:36 AM Muhammed Ashik > > > > > wrote: > > > > > > > > > > > Thanks How can I verify If there is some data really going on > > rocksdb > > > > > > I tried printing the statistics with no success. > > > > > > > > > > > > class CustomRocksDBConfig extends RocksDBConfigSetter { > > > > > > override def setConfig(storeName: String, options: Options, > > > configs: > > > > > > util.Map[String, AnyRef]): Unit = { > > > > > > > > > > > > val stats = new Statistics > > > > > > stats.setStatsLevel(StatsLevel.ALL) > > > > > > options.setStatistics(stats) > > > > > > .setStatsDumpPeriodSec(600) > > > > > > options > > > > > > .setInfoLogLevel(InfoLogLevel.INFO_LEVEL) > > > > > > options.setDbLogDir("/tmp/dump") > > > > > > > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > and included in the stream config .. > > > > > > > > > > > > settings.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > > > > > classOf[CustomRocksDBConfig]) > > > > > > > > > > > > > > > > > > Regards > > > > > > Ashik > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 17, 2019 at 10:52 PM Sophie Blee-Goldman < > > > > > sop...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > Sorry, didn't see the "off-heap" part of the email. Are you > using > > > any > > > > > > > stateful DSL operators? The default stores are persistent, so > you > > > may > > > > > > have > > > > > > > a RocksDB store in your topology without explicitly using one. > > > > > > > > > > > > > > On Wed, Jul 17, 2019 at 10:12 AM Sophie Blee-Goldman < > > > > > > sop...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > If you are using inMemoryKeyValueStore, the records are > stored > > by > > > > > > > > definition in memory. RocksDB is not used at all. This store > > will > > > > > > > continue > > > > > > > > to grow proportionally to your keyspace. If you do not have > > > > > sufficient > > > > > > > > memory to hold your entire dataset in memory, consider adding > > > > another > > > > > > > > instance or switching to the RocksDB store > > > > > > > > > > > > > > > > On Wed, Jul 17, 2019 at 6:22 AM Muhammed Ashik < > > > ashi...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > >> Kafka Streams version - 2.0.0 > > > > > > > >> > > > > > > > >> Hi, in our streaming instance we are observing a steady > growth > > > in > > > > > the > > > > > > > >> off-heap memory (out of 2gb allocated memory 1.3 is reserved > > for > > >