UNSUBSCRIBE PLEASE
-Original Message- From: Gerard Klijs [mailto:gerard.kl...@dizzit.com] Sent: Wednesday, May 11, 2016 3:00 AM To: users@kafka.apache.org Subject: Re: Backing up Kafka data and using it later? You could create a docker image with a kafka installation, and start a mirror maker in it, you could set the retention time for it to infinite, and mount the data volume. With the data you could always restart the docker, en mirror it to somewhere else. Not sure that's what you want, but it's an option to save data for use some other place/time. On Wed, May 11, 2016 at 12:33 AM Alex Loddengaard wrote: > You may find this interesting, although I don't believe it's exactly > what you're looking for: > > https://github.com/pinterest/secor > > I'm not sure how stable and commonly used it is. > > Additionally, I see a lot of users use MirrorMaker for a "backup," > where MirrorMaker copies all topics from one Kafka cluster to another "backup" > cluster. I put "backup" in quotes because this architecture doesn't > support snapshotting like a traditional backup would. I realize this > doesn't address your specific use case, but thought you may find it > interesting regardless. > > Sorry I'm a little late to the thread, too. > > Alex > > On Thu, May 5, 2016 at 7:05 AM, Rad Gruchalski > wrote: > > > John, > > > > I’m not as expert expert in Kafka but I would assume so. > > > > > > > > > > > > > > > > > > > > > > Best regards, > > Radek Gruchalski > > ra...@gruchalski.com (mailto:ra...@gruchalski.com) (mailto: > > ra...@gruchalski.com) > > de.linkedin.com/in/radgruchalski/ ( > > http://de.linkedin.com/in/radgruchalski/) > > > > Confidentiality: > > This communication is intended for the above-named person and may be > > confidential and/or legally privileged. > > If it has come to you in error you must take no action based on it, > > nor must you copy or show it to anyone; please delete/destroy and > > inform the sender immediately. > > > > > > > > On Thursday, 5 May 2016 at 01:46, John Bickerstaff wrote: > > > > > Thanks - does that mean that the only way to safely back up Kafka > > > is to have replication? > > > > > > (I have done this partially - I can get the entire topic on the > > > command line, after completely recreating the server, but my code > > > that is > > intended > > > to do the same thing just hangs) > > > > > > On Wed, May 4, 2016 at 3:18 PM, Rad Gruchalski > > > > (mailto:ra...@gruchalski.com)> wrote: > > > > > > > John, > > > > > > > > I believe you mean something along the lines of: > > > > http://markmail.org/message/f7xb5okr3ujkplk4 > > > > I don’t think something like this has been done. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > Radek Gruchalski > > > > ra...@gruchalski.com (mailto:ra...@gruchalski.com) (mailto: > > > > ra...@gruchalski.com (mailto:ra...@gruchalski.com)) > > > > de.linkedin.com/in/radgruchalski/ ( > > http://de.linkedin.com/in/radgruchalski/) ( > > > > http://de.linkedin.com/in/radgruchalski/) > > > > > > > > Confidentiality: > > > > This communication is intended for the above-named person and > > > > may be confidential and/or legally privileged. > > > > If it has come to you in error you must take no action based on > > > > it, > nor > > > > must you copy or show it to anyone; please delete/destroy and > > > > inform > > the > > > > sender immediately. > > > > > > > > > > > > > > > > On Wednesday, 4 May 2016 at 23:04, John Bickerstaff wrote: > > > > > > > > > Hi, > > > > > > > > > > I have what is probably an edge use case. I'd like to back up > > > > > a > > single > > > > > Kafka instance such that I can recreate a new server, drop > > > > > Kafka > in, > > drop > > > > > the data in, start Kafka -- and have all my data ready to go > > > > > again > > for > > > > > consumers. > > > > > > > > > > Is such a thing done? Does anyone have any experience trying this? > > > > > > > > > > I have, and I've run into some problems which suggest there's > > > > > a > > setting > > > > or > > > > > some other thing I'm unaware of... > > > > > > > > > > If you like, don't think of it as a backup problem so much as > > > > > a > > "cloning" > > > > > problem. I want to clone a new Kafka machine without actually > > cloning it > > > > > > > > > > > > > - > > > > > I.E. the data is somewhere else (log and index files) although > > Zookeeper > > > > > > > > is > > > > > up and running just fine. > > > > > > > > > > Thanks > > > > >
UNSUBSCRIBE PLEASE
users@kafka.apache.org;users-unsubscr...@kafka.apache.org; users_unsubscr...@kafka.apache.org; d...@kafka.apache.org; dev-unsubscr...@kafka.apache.org; dev_unsubscr...@kafka.apache.org -Original Message- From: Raj Tanneru [mailto:rtann...@fanatics.com] Sent: Saturday, May 7, 2016 6:46 PM To: users@kafka.apache.org Cc: d...@kafka.apache.org Subject: Re: KAFKA-3112 Thanks Ismael and Tao. Appreciate it. Sent from my iPhone > On May 7, 2016, at 1:14 AM, Ismael Juma wrote: > > Hi Raj and Tao, > > I just merged the KAFKA-3112 PR, so this issue will be fixed in 0.10.0.0. > > Thanks, > Ismael > >> On Fri, May 6, 2016 at 7:47 PM, tao xiao wrote: >> >> KAFKA-2657 is unresolved so you can safely assume it hasn't been fixed yet. >> >>> On Fri, 6 May 2016 at 07:38 Raj Tanneru wrote: >>> >>> Yeah it is a duplicate of KAFKA-2657. The question is how to check / >>> know if it is merged to 0.9.0.1 release. What are the options that I >>> have if I need this fix. How can I get patch for this on 0.8.2.1? >>> >>> Sent from my iPhone >>> On May 6, 2016, at 12:06 AM, tao xiao wrote: It said this is a duplication. This is the https://issues.apache.org/jira/browse/KAFKA-2657 that KAKFA-3112 >>> duplicates to. > On Thu, 5 May 2016 at 22:13 Raj Tanneru >> wrote: > > > Hi All, > Does anyone know if KAFKA-3112 is merged to 0.9.0.1? Is there a > place >> to > check which version has this fix? Jira doesn't show fix versions. > > https://issues.apache.org/jira/browse/KAFKA-3112 > > > Thanks, > Raj Tanneru > Information contained in this e-mail message is confidential. This >>> e-mail > message is intended only for the personal use of the recipient(s) >> named > above. If you are not an intended recipient, do not read, > distribute >> or > reproduce this transmission (including any attachments). If you > have received this email in error, please immediately notify the > sender by >>> email > reply and delete the original message. >>> Information contained in this e-mail message is confidential. This >>> e-mail message is intended only for the personal use of the >>> recipient(s) named above. If you are not an intended recipient, do >>> not read, distribute or reproduce this transmission (including any >>> attachments). If you have received this email in error, please >>> immediately notify the sender by >> email >>> reply and delete the original message. >> Information contained in this e-mail message is confidential. This e-mail message is intended only for the personal use of the recipient(s) named above. If you are not an intended recipient, do not read, distribute or reproduce this transmission (including any attachments). If you have received this email in error, please immediately notify the sender by email reply and delete the original message.
UNSUBSCRIBE PLEASE
-Original Message- From: Kuldeep Kamboj [mailto:kuldeep.kam...@osscube.com] Sent: Monday, May 2, 2016 11:29 PM To: users@kafka.apache.org Subject: Getting Timed out reading socket error for kafka cluster setup Hi, I want to setup a kafka cluster type setup for three similar application having same queues like AppA -> {TopicX, TopicY, TopicZ}, AppB -> {TopicX, TopicZ}, AppC -> {TopicX, TopicY}. Producer and Consumer will be App specific. I setup kafka cluster with three brokers having partition 1,2,3 in three different config files with different ports. Then start kafka server ( cluster ) I am using kafka php wrapper by http://github.com/nmred/kafka-php So I used Producer code for App A like $producer->setRequireAck(-1); $producer->setMessages("TopicX", 0, array(json_encode($this->data))); $producer->send(); AND used Producer code for App B like $producer->setRequireAck(-1); $producer->setMessages("TopicX", 1, array(json_encode($this->data))); $producer->send(); And So On. Then I made my Consumer scripts for three apps like $queues = array("TopicX", "TopicY", "TopicZ"); while(true) { foreach($queues as $queue) { $consumer = \Kafka\Consumer::getInstance('localhost:2181'); $consumer->setGroup('testgroup'); $consumer->setPartition($queue, 0); $result = $consumer->fetch(); } } But when I try to execute consumer script for any App I get error like **"Timed out reading socket while reading 750437 bytes with 750323 bytes to go"** I just don't know How I can fix this issue I tried to modify some kafka config parameters like zookeeper.connection.timeout.ms=24000 # Initially 6000 replica.socket.timeout.ms=15000 # Not exists in default file but that not worked. -- Kuldeep Kamboj
Re: Initializing StateStores takes *really* long for large datasets
Unsubscribe Sent via the Samsung Galaxy S7, an AT&T 4G LTE smartphone Original message From: Guozhang Wang Date: 12/2/16 5:13 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re: Initializing StateStores takes *really* long for large datasets Before we have the a single-knob memory management feature, I'd like to propose reducing the Streams' default config values for RocksDB caching and memory block size. For example, I remember Henry has done some fine tuning on the RocksDB config for his use case: https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4 https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803 https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 We could check if some of those changes are appropriate in general and if yes change the default settings accordingly. Henry On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi wrote: > +1 on this. > > Ara. > > > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > > > > I'd like to quickly reinforce Frank's opinion regarding the rocksdb > memory > > usage. I was also surprised by the amount of non-JVM-heap memory being > > used and had to tune the 100 MB default down considerably. It's also > > unfortunate that it's hard to estimate the memory requirements for a KS > app > > because of this. If you have ten stores, and assuming the default > config, > > you'd need a GB of memory for the rocksdb cache if you run 1 app, but > only > > half a GB if you run two app instances because the stores will be > > distributed. > > > > It would be much nicer to be able to give KS a fixed amount of memory in > a > > config that it divided among the active stores on a node. Configure it > > with N GB; if a rebalance adds more tasks and stores, they each get less > > RAM; if a rebalance removes tasks and stores, the remaining stores get > more > > RAM. It seems like it'd be hard to do this with the RocksDBConfigSetter > > interface because it doesn't get any state about the KS topology to make > > decisions; which are arguably not config, but tuning / performance > > decisions. > > > > Mathieu > > > > > > > > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu wrote: > > > >> I'll write an update on where I am now. > >> > >> I've got about 40 'primary' topics, some small, some up to about 10M > >> messages, > >> and about 30 internal topics, divided over 6 stream instances, all > running > >> in a single > >> app, talking to a 3 node Kafka cluster. > >> > >> I use a single thread per stream instance, as my prime concern is now to > >> get it > >> to run stable, rather than optimizing performance. > >> > >> My biggest issue was that after a few hours my application started to > slow > >> down > >> to ultimately freeze up or crash. It turned out that RocksDb consumed > all > >> my > >> memory, which I overlooked as it was off-heap. > >> > >> I was fooling around with RocksDb settings a bit but I had missed the > most > >> important > >> one: > >> > >> BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); > >> tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); > >> tableConfig.setBlockSize(BLOCK_SIZE); > >> options.setTableFormatConfig(tableConfig); > >> > >> The block cache size defaults to a whopping 100Mb per store, and that > gets > >> expensive > >> fast. I reduced it to a few megabytes. My data size is so big that I > doubt > >> it is very effective > >> anyway. Now it seems more stable. > >> > >> I'd say that a smaller default makes sense, especially because the > failure > >> case is > >> so opaque (running all tests just fine but with a serious dataset it > dies > >> slowly) > >> > >> Another thing I see is that while starting all my instances, some are > quick > >> and some take > >> time (makes sense as the data size varies greatly), but as more > instances > >> start up, they > >> start to use more and more CPU I/O and network, that the initialization > of > >> the bigger ones > >> takes even longer, increasing the chance that one of them takes longer > than > >> the > >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we > can > >> separate the 'initialize' and 'start' step somehow. > >> > >> In this case we could log better: If initialization is taking longer > than > >> the timeout, it ends up > >> being reassigned (in my case to the same instance) and then it errors > out > >> on being unable > >> to lock the state dir. That message isn't too informative as the > timeout is > >> the actual problem. > >> > >> regards, Frank > >> > >> > >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang > wrote: > >> > >>> Hello Frank, > >>> > >>> How many instances do you have in your apps and how many threads did > you > >>> use per thread? Note that besides the topology complexity (i.e. number > of > >>> state stores, number of internal topics etc)
Re: Kafka windowed table not aggregating correctly
Unsubscribe Sent via the Samsung Galaxy S7, an AT&T 4G LTE smartphone Original message From: Guozhang Wang Date: 12/2/16 5:48 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re: Kafka windowed table not aggregating correctly Sachin, One thing to note is that the retention of the windowed stores works by keeping multiple segments of the stores where each segments stores a time range which can potentially span multiple windows, if a new window needs to be created that is further from the oldest segment's time range + retention period (from your code it seems you do not override it from TimeWindows.of("stream-table", 10 * 1000L).advanceBy(5 * 1000L), via until(...)), so the default of one day is used. So with WallclockTimeExtractor since it is using system time, it wont give you timestamps that span for more than a day during a short period of time, but if your own defined timestamps expand that value, then old segments will be dropped immediately and hence the aggregate values will be returned as a single value. Guozhang On Fri, Dec 2, 2016 at 11:58 AM, Matthias J. Sax wrote: > The extractor is used in > > org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords() > > Let us know, if you could resolve the problem or need more help. > > -Matthias > > On 12/2/16 11:46 AM, Sachin Mittal wrote: > > https://github.com/SOHU-Co/kafka-node/ this is the node js client i am > > using. The version is 0.5x. Can you please tell me what code in streams > > calls the timestamp extractor. I can look there to see if there is any > > issue. > > > > Again issue happens only when producing the messages using producer that > is > > compatible with kafka version 0.8x. I see that this producer does not > send > > a record timestamp as this was introduced in version 0.10 only. > > > > Thanks > > Sachin > > > > On 3 Dec 2016 1:03 a.m., "Matthias J. Sax" > wrote: > > > >> I am not sure what is happening. That's why it would be good to have a > >> toy example to reproduce the issue. > >> > >> What do you mean by "Kafka node version 0.5"? > >> > >> -Matthias > >> > >> On 12/2/16 11:30 AM, Sachin Mittal wrote: > >>> I can provide with the data but data does not seem to be the issue. > >>> If I submit the same data and use same timestamp extractor using the > >> java > >>> client with kafka version 0.10.0.1 aggregation works fine. > >>> I find the issue only when submitting the data with kafka node version > >> 0.5. > >>> It looks like the stream does not extract the time correctly in that > >> case. > >>> > >>> Thanks > >>> Sachin > >>> > >>> On 2 Dec 2016 11:41 p.m., "Matthias J. Sax" > >> wrote: > >>> > Can you provide example input data (including timetamps) and result. > What is the expected result (ie, what aggregation do you apply)? > > > -Matthias > > On 12/2/16 7:43 AM, Sachin Mittal wrote: > > Hi, > > After much debugging I found an issue with timestamp extractor. > > > > If I use a custom timestamp extractor with following code: > > public static class MessageTimestampExtractor implements > > TimestampExtractor { > > public long extract(ConsumerRecord record) { > > if (record.value() instanceof Message) { > > return ((Message) record.value()).ts; > > } else { > > return record.timestamp(); > > } > > } > > } > > > > Here message has a long field ts which stores the timestamp, the > > aggregation does not work. > > Note I have checked and ts has valid timestamp values. > > > > However if I replace it with say WallclockTimestampExtractor > >> aggregation > is > > working fine. > > > > I do not understand what could be the issue here. > > > > Also note I am using kafka streams version 0.10.0.1 and I am > publishing > > messages via > > https://github.com/SOHU-Co/kafka-node/ whose version is quite old > >> 0.5.x > > > > Let me know if there is some bug in time stamp extractions. > > > > Thanks > > Sachin > > > > > > > > On Mon, Nov 28, 2016 at 11:52 PM, Guozhang Wang > wrote: > > > >> Sachin, > >> > >> This is indeed a bit wired, and we'd like to try to re-produce your > issue > >> locally. Do you have a sample input data for us to try out? > >> > >> Guozhang > >> > >> On Fri, Nov 25, 2016 at 10:12 PM, Sachin Mittal > > >> wrote: > >> > >>> Hi, > >>> I fixed that sorted set issue but I am facing a weird problem > which I > am > >>> not able to replicate. > >>> > >>> Here is the sample problem that I could isolate: > >>> My class is like this: > >>> public static class Message implements Comparable { > >>> public long ts; > >>> public String message; > >>> public String key; > >>>