Re: Mapping a consumer in a consumer group to a partition in a topic
As far as I know, with a consumer group implementation you cannot pin consumers to partitions. That logic is taken care of by the high level API on its own. > On 23-Sep-2015, at 6:38 AM, Spandan Harithas Karamchedu > wrote: > > Hi, > > We created a topic with 3 partitions and a replication factor of 3. We are > able to implement a consumer to get the data from a specific partition in a > topic but we are struck in implementing a consumer within a specified > consumer group to be mapped to single partition of a topic and get the data > from the single partition of the topic. Can you let us how we can configure > a consumer to a partition within consumer group? > > > > We wanted to develop a kafka cluster similar to below one as specified in > kafka docs. We are using kafka 0.8.1. > > > > > Thanks > > Spandan
Re: Mapping a consumer in a consumer group to a partition in a topic
Unfortunately, in order to get a specific partition, you will need to use the simple consumer API, which does not have consumer groups. see here for details: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example On Tue, Sep 22, 2015 at 6:08 PM, Spandan Harithas Karamchedu < spandanharit...@gmail.com> wrote: > Hi, > > We created a topic with 3 partitions and a replication factor of 3. We are > able to implement a consumer to get the data from a specific partition in a > topic but we are struck in implementing a consumer within a specified > consumer group to be mapped to single partition of a topic and get the data > from the single partition of the topic. Can you let us how we can configure > a consumer to a partition within consumer group? > > > > We wanted to develop a kafka cluster similar to below one as specified in > kafka docs. We are using kafka 0.8.1. > > > > > Thanks > > Spandan >
Mapping a consumer in a consumer group to a partition in a topic
Hi, We created a topic with 3 partitions and a replication factor of 3. We are able to implement a consumer to get the data from a specific partition in a topic but we are struck in implementing a consumer within a specified consumer group to be mapped to single partition of a topic and get the data from the single partition of the topic. Can you let us how we can configure a consumer to a partition within consumer group? We wanted to develop a kafka cluster similar to below one as specified in kafka docs. We are using kafka 0.8.1. Thanks Spandan
Re: Debugging high log flush latency on a broker.
Thanks Steve. I followed your suggestion to get the topics. What is weird is that the bad broker does not get any more traffic (messages or bytes) when this happens. Also I have more than 2 G (out of 28G) free memory according to collectd and running vmstat on the box, so I hope that things don't get paged out. The incoming traffic is paltry (15k messages/sec on such a big box) so I can't understand how this is happening. All my config across all brokers is the same. They are running on the same EC2 instance with the same OS configuration. I don't see any network hiccup either. The network in is pretty steady. When the symptom starts we see the CPU take a spike it goes to 45% from 20% and then stays at around 22-24% for the rest of the spike. On Tue, Sep 22, 2015 at 12:34 PM, Steve Miller wrote: >There may be more elegant ways to do this, but I'd think that you could > just ls all the directories specified in log.dirs in your server.properties > file for Kafka. You should see directories for each > topicname-partitionnumber there. > >Offhand it sounds to me like maybe something's evicting pages from the > buffer cache from time to time, causing Kafka to do a lot more I/O all of a > sudden than usual. Why that happens, I don't know, but that'd be my guess: > either something needs more pages for applications all of a sudden, or like > you said, there's some characteristic of the traffic for the partitions on > this broker that isn't the same as it is for all the other brokers. > >Filesystem type and creation parameters are the same as on the other > hosts? sysctl stuff all tuned the same way (assuming this is Linux, that > is)? > >Any chance there's some sort of network hiccup that makes some follower > get a little behind, and then the act of it trying to catch back up pushes > the I/O past what it can sustain steady-state? (If something gets > significantly behind, depending on the size of your buffer cache relative > to the retention in your topics, you could have something, say, start > reading from the first offset in that topic and partition, which might well > require going to disk rather than being satisfied from the buffer cache. I > could see that slowing I/O enough, if it's on the edge otherwise, that now > you can't keep up with the write rate until that consumer gets caught up.) > >The other idea would be that, I dunno, maybe there's topic where the > segment size is different, and so when it goes to delete a segment it's > spending a lot more time putting blocks from that file back onto the > filesystem free list (or whatever data structure it is these days (-: ). > > -Steve > > On Tue, Sep 22, 2015 at 11:46:49AM -0700, Rajiv Kurian wrote: > > Also any hints on how I can find the exact topic/partitions assigned to > > this broker? I know in ZK we can see the partition -> broker mapping, > but I > > am looking for a broker -> partition mapping. I can't be sure if the load > > that is causing this problem is because of leader traffic or follower > > traffic. What is weird is that I rarely if ever see other brokers in the > > cluster have the same problem. With 3 way replication (leader + 2 > replicas) > > I'd imagine that the same work load would cause problems on other brokers > > too. >
ZkClient throwing NPEs
Hi, I keep getting the below exception when Kafka tries to connect to zookeeper and zookeeper is momentarily not able to connect. After that, connection does not restore unless we restart the servers. This may be connected to this issue : https://issues.apache.org/jira/browse/KAFKA-824 But I am already using zkclient-0.5, and still seeing this issue. java.lang.NullPointerException: null at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:107) ~[zkclient-0.5.jar:0.5] at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:631) ~[zkclient-0.5.jar:0.5] at org.I0Itec.zkclient.ZkClient$3.call(ZkClient.java:628) ~[zkclient-0.5.jar:0.5] at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:883) ~[zkclient-0.5.jar:0.5] at org.I0Itec.zkclient.ZkClient.exists(ZkClient.java:628) ~[zkclient-0.5.jar:0.5] at org.I0Itec.zkclient.ZkClient.exists(ZkClient.java:637) ~[zkclient-0.5.jar:0.5] at kafka.admin.AdminUtils$.topicExists(AdminUtils.scala:163) ~[kafka_2.10-0.8.2.0.jar:na] at kafka.admin.AdminUtils.topicExists(AdminUtils.scala) ~[kafka_2.10-0.8.2.0.jar:na] Any ideas of how to solve this issue? Thanks, Hema
Re: Debugging high log flush latency on a broker.
There may be more elegant ways to do this, but I'd think that you could just ls all the directories specified in log.dirs in your server.properties file for Kafka. You should see directories for each topicname-partitionnumber there. Offhand it sounds to me like maybe something's evicting pages from the buffer cache from time to time, causing Kafka to do a lot more I/O all of a sudden than usual. Why that happens, I don't know, but that'd be my guess: either something needs more pages for applications all of a sudden, or like you said, there's some characteristic of the traffic for the partitions on this broker that isn't the same as it is for all the other brokers. Filesystem type and creation parameters are the same as on the other hosts? sysctl stuff all tuned the same way (assuming this is Linux, that is)? Any chance there's some sort of network hiccup that makes some follower get a little behind, and then the act of it trying to catch back up pushes the I/O past what it can sustain steady-state? (If something gets significantly behind, depending on the size of your buffer cache relative to the retention in your topics, you could have something, say, start reading from the first offset in that topic and partition, which might well require going to disk rather than being satisfied from the buffer cache. I could see that slowing I/O enough, if it's on the edge otherwise, that now you can't keep up with the write rate until that consumer gets caught up.) The other idea would be that, I dunno, maybe there's topic where the segment size is different, and so when it goes to delete a segment it's spending a lot more time putting blocks from that file back onto the filesystem free list (or whatever data structure it is these days (-: ). -Steve On Tue, Sep 22, 2015 at 11:46:49AM -0700, Rajiv Kurian wrote: > Also any hints on how I can find the exact topic/partitions assigned to > this broker? I know in ZK we can see the partition -> broker mapping, but I > am looking for a broker -> partition mapping. I can't be sure if the load > that is causing this problem is because of leader traffic or follower > traffic. What is weird is that I rarely if ever see other brokers in the > cluster have the same problem. With 3 way replication (leader + 2 replicas) > I'd imagine that the same work load would cause problems on other brokers > too.
Debugging high log flush latency on a broker.
I have a particular broker(version 0.8.2.1) in a cluster receiving about 15000 messages/second of around 100 bytes each (bytes-in / messages-in). This broker has bursts of really high log flush latency p95s. The latency sometimes goes to above 1.5 seconds from a steady state of < 20 ms. Running top on the box shows bursts of wa (io wait) of over 10% during these high latency cycles. Running iostat during the same time shows that it was doing more than 16000 block writes/sec during these periods of high latency. Running iostat during a normal run shows around 3000 - 7000 block writes/second. So it does seem like during the latency cycle we are just trying to do too much IO. I have tried replacing the broker several times to see if that would help but it hasn't so far, so my guess is there is something peculiar about one or more of the partitions that are assigned to that broker. The other observation is that messages-in/sec and bytes-in/sec DO NOT go up higher when the broker is misbehaving. I have also noticed that every time I replace the broker it takes much longer for the new broker to bootstrap when compared to other brokers that I have replaced. For example it takes up to 20 minutes before the log flush latency on the newly bootstrapped broker (with the same ID as the one it replaced) to go to normal levels. Any hints on what else I can try (replacing broker does not seem to help? 15000 messages/second of around 100 bytes each on a dual SSD, 30 GB RAM and 16 core box would seem very doable. It doesn't bother the other brokers in the cluster either. Also any hints on how I can find the exact topic/partitions assigned to this broker? I know in ZK we can see the partition -> broker mapping, but I am looking for a broker -> partition mapping. I can't be sure if the load that is causing this problem is because of leader traffic or follower traffic. What is weird is that I rarely if ever see other brokers in the cluster have the same problem. With 3 way replication (leader + 2 replicas) I'd imagine that the same work load would cause problems on other brokers too. Thanks, Rajiv
mirror maker: mapping topic names
using mirror maker, i would like to write events received from one topic and write them to another. for example, for events received on the topic 'clicks,' i want to write them to 'mirrored.clicks' on my destination cluster. is that possible? thanks, d
Re: log.retention.hours not working?
One caveat. If you are relying on log.segment.ms to roll the current log segment, it will not roll until the both time elapses and something new arrives for the log. In other words, if your topic/log segment are idle, no rolling will happen. The theoretically ineligible log will still be the current open log segment. On Mon, Sep 21, 2015 at 10:33 PM, Todd Palino wrote: > Retention is going to be based on a combination of both the retention and > segment size settings (as a side note, it's recommended to use > log.retention.ms and log.segment.ms, not the hours config. That's there > for > legacy reasons, but the ms configs are more consistent). As messages are > received by Kafka, they are written to the current open log segment for > each partition. That segment is rotated when either the log.segment.bytes > or the log.segment.ms limit is reached. Once that happens, the log segment > is closed and a new one is opened. Only after a log segment is closed can > it be deleted via the retention settings. Once the log segment is closed > AND either all the messages in the segment are older than log.retention.ms > OR the total partition size is greater than log.retention.bytes, then the > log segment is purged. > > As a note, the default segment limit is 1 gibibyte. So if you've only > written in 1k of messages, you have a long way to go before that segment > gets rotated. This is why the retention is referred to as a minimum time. > You can easily retain much more than you're expecting for slow topics. > > -Todd > > > On Mon, Sep 21, 2015 at 7:28 PM, allen chan > wrote: > > > I guess that kind of makes sense. > > The following section in the config is what confused me: > > *"# The following configurations control the disposal of log segments. > The > > policy can* > > *# be set to delete segments after a period of time, or after a given > size > > has accumulated.* > > *# A segment will be deleted whenever *either* of these criteria are met. > > Deletion always happens* > > *# from the end of the log."* > > > > That makes it sound like deletion will happen if either of the criteria > is > > met. > > I thought the whole idea of those two settings (time and bytes) is > telling > > the application when it will need to delete. > > > > > > > > On Mon, Sep 21, 2015 at 7:10 PM, noah wrote: > > > > > "minimum age of a log file to be eligible for deletion" Key word is > > > minimum. If you only have 1k logs, Kafka doesn't need to delete > anything. > > > Try to push more data through and when it needs to, it will start > > deleting > > > old logs. > > > > > > On Mon, Sep 21, 2015 at 8:58 PM allen chan < > allen.michael.c...@gmail.com > > > > > > wrote: > > > > > > > Hi, > > > > > > > > Just brought up new kafka cluster for testing. > > > > Was able to use the console producers to send 1k of logs and received > > it > > > on > > > > the console consumer side. > > > > > > > > The one issue that i have right now is that the retention period does > > not > > > > seem to be working. > > > > > > > > *# The minimum age of a log file to be eligible for deletion* > > > > *log.retention.hours=1* > > > > > > > > I have waited for almost 2 hours and the 1k of logs are still in > kafka. > > > > > > > > I did see these messages pop up on the console > > > > *[2015-09-21 17:12:01,236] INFO Scheduling log segment 0 for log > test-1 > > > for > > > > deletion. (kafka.log.Log)* > > > > *[2015-09-21 17:13:01,238] INFO Deleting segment 0 from log test-1. > > > > (kafka.log.Log)* > > > > *[2015-09-21 17:13:01,239] INFO Deleting index > > > > /var/log/kafka/test-1/.index.deleted > > > > (kafka.log.OffsetIndex)* > > > > > > > > I know the logs are still in there because i am using > > > > the kafka-consumer-offset-checker.sh and it says how many messages > the > > > > logSize is. > > > > > > > > What am i missing in my configuration? > > > > > > > > > > > > > > > > Thanks! > > > > > > > > -- > > > > Allen Michael Chan > > > > > > > > > > > > > > > -- > > Allen Michael Chan > > >
Re: log.retention.hours not working?
All of the information Todd posted is important to know. There was also jira related to this that has been committed trunk: https://issues.apache.org/jira/browse/KAFKA-2436 Before that patch, log.retention.hours was used to calculate KafkaConfig.logRetentionTimeMillis. But it was not used in LogManager to decide when to delete a log. LogManager was only using the log.retention.ms in the broker configuration. Could you try setting log.retention.ms=360 instead of using the hours config? On Mon, Sep 21, 2015 at 10:33 PM, Todd Palino wrote: > Retention is going to be based on a combination of both the retention and > segment size settings (as a side note, it's recommended to use > log.retention.ms and log.segment.ms, not the hours config. That's there > for > legacy reasons, but the ms configs are more consistent). As messages are > received by Kafka, they are written to the current open log segment for > each partition. That segment is rotated when either the log.segment.bytes > or the log.segment.ms limit is reached. Once that happens, the log segment > is closed and a new one is opened. Only after a log segment is closed can > it be deleted via the retention settings. Once the log segment is closed > AND either all the messages in the segment are older than log.retention.ms > OR the total partition size is greater than log.retention.bytes, then the > log segment is purged. > > As a note, the default segment limit is 1 gibibyte. So if you've only > written in 1k of messages, you have a long way to go before that segment > gets rotated. This is why the retention is referred to as a minimum time. > You can easily retain much more than you're expecting for slow topics. > > -Todd > > > On Mon, Sep 21, 2015 at 7:28 PM, allen chan > wrote: > > > I guess that kind of makes sense. > > The following section in the config is what confused me: > > *"# The following configurations control the disposal of log segments. > The > > policy can* > > *# be set to delete segments after a period of time, or after a given > size > > has accumulated.* > > *# A segment will be deleted whenever *either* of these criteria are met. > > Deletion always happens* > > *# from the end of the log."* > > > > That makes it sound like deletion will happen if either of the criteria > is > > met. > > I thought the whole idea of those two settings (time and bytes) is > telling > > the application when it will need to delete. > > > > > > > > On Mon, Sep 21, 2015 at 7:10 PM, noah wrote: > > > > > "minimum age of a log file to be eligible for deletion" Key word is > > > minimum. If you only have 1k logs, Kafka doesn't need to delete > anything. > > > Try to push more data through and when it needs to, it will start > > deleting > > > old logs. > > > > > > On Mon, Sep 21, 2015 at 8:58 PM allen chan < > allen.michael.c...@gmail.com > > > > > > wrote: > > > > > > > Hi, > > > > > > > > Just brought up new kafka cluster for testing. > > > > Was able to use the console producers to send 1k of logs and received > > it > > > on > > > > the console consumer side. > > > > > > > > The one issue that i have right now is that the retention period does > > not > > > > seem to be working. > > > > > > > > *# The minimum age of a log file to be eligible for deletion* > > > > *log.retention.hours=1* > > > > > > > > I have waited for almost 2 hours and the 1k of logs are still in > kafka. > > > > > > > > I did see these messages pop up on the console > > > > *[2015-09-21 17:12:01,236] INFO Scheduling log segment 0 for log > test-1 > > > for > > > > deletion. (kafka.log.Log)* > > > > *[2015-09-21 17:13:01,238] INFO Deleting segment 0 from log test-1. > > > > (kafka.log.Log)* > > > > *[2015-09-21 17:13:01,239] INFO Deleting index > > > > /var/log/kafka/test-1/.index.deleted > > > > (kafka.log.OffsetIndex)* > > > > > > > > I know the logs are still in there because i am using > > > > the kafka-consumer-offset-checker.sh and it says how many messages > the > > > > logSize is. > > > > > > > > What am i missing in my configuration? > > > > > > > > > > > > > > > > Thanks! > > > > > > > > -- > > > > Allen Michael Chan > > > > > > > > > > > > > > > -- > > Allen Michael Chan > > > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
RE: committing offsets
Yep, that was it ... Everything works now. And the only thing that didn't, earlier, was my head. Thanks all! -Joris. -Original Message- From: noah [mailto:iamn...@gmail.com] Sent: 22 September 2015 12:17 To: users@kafka.apache.org Subject: Re: committing offsets If you are using the console consumer to check the offsets topic, remember that you need this line in consumer.properties: exclude.internal.topics=false On Tue, Sep 22, 2015 at 6:05 AM Joris Peeters wrote: > Ah, nice! Does not look like it is working, though. For some reason > the __consumer_offsets topic is still empty. I see there's a few > debug(..) logging messages that might get displayed if things go wrong > - would you know how to get those displayed? (Right now I'm just > running as 'java -jar ...' with no logging config). > > -J > > -Original Message- > From: tao xiao [mailto:xiaotao...@gmail.com] > Sent: 22 September 2015 10:51 > To: users@kafka.apache.org > Subject: Re: committing offsets > > 0.8.2.1 already supports Kafka offset storage. You can set > offsets.storage=kafka in consumer properties and high level API is > able to pick it up and commit offsets to Kafka > > Here is the code reference where kafka offset logic kicks in > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/c > onsumer/ZookeeperConsumerConnector.scala#L308 > > On Tue, 22 Sep 2015 at 17:44 Joris Peeters > > wrote: > > > I'm trying to set up a kafka consumer (in Java) that uses the new > > approach of committing offsets (i.e. in the __consumer_offsets topic > > etc, rather than through zookeeper). > > > > Am I correct in believing that the current version (we're using > > kafka > > 8.2.1) does not expose this through the high level consumer yet? > > > > I am using kafka.consumer.Consumer.createJavaConsumerConnector to > > set up the consumer. I can play with the enabling/disabling of > > auto-commits, or use commitOffsets to commit manually, but that does > > not seem to lead to any entries in __consumer_offsets. Also, by > > following the code trail a bit, it does seem as if offset committing > > is > still happening through Zookeeper. > > > > (One reason we'd like to use the new system is because we're > > interested in monitoring it through Burrow) > > > > Apologies if I missed something simple. > > Cheers! > > -Joris. > > > > > > > > Winton Capital Management Limited ("Winton") is a limited company > > registered in England and Wales with its registered offices at 16 > > Old Bailey, London, EC4M 7EG (Registered Company No. 3311531). > > Winton is authorised and regulated by the Financial Conduct > > Authority in the United Kingdom, registered as an investment adviser > > with the US Securities and Exchange Commission, registered with the > > US Commodity Futures Trading Commission and a member of the National > > Futures Association in the United States. > > > > This communication, including any attachments, is confidential and > > may be privileged. This email is for use by the intended recipient only. > > If you receive it in error, please notify the sender and delete it. > > You should not copy or disclose all or any part of this email. > > > > This email does not constitute an offer or solicitation and nothing > > contained in this email constitutes, and should not be construed as, > > investment advice. Prospective investors should request offering > > materials and consult their own advisers with respect to investment > > decisions and inform themselves as to applicable legal requirements, > > exchange control regulations and taxes in the countries of their > > citizenship, residence or domicile. Past performance is not > > indicative > of future results. > > > > Winton takes reasonable steps to ensure the accuracy and integrity > > of its communications, including emails. However Winton accepts no > > liability for any materials transmitted. Emails are not secure and > > cannot be guaranteed to be error free. > > > > > Winton Capital Management Limited (“Winton”) is a limited company > registered in England and Wales with its registered offices at 16 Old > Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is > authorised and regulated by the Financial Conduct Authority in the > United Kingdom, registered as an investment adviser with the US > Securities and Exchange Commission, registered with the US Commodity > Futures Trading Commission and a member of the National Futures > Association in the United States. > > This communication, including any attachments, is confidential and may > be privileged. This email is for use by the intended recipient only. > If you receive it in error, please notify the sender and delete it. > You should not copy or disclose all or any part of this email. > > This email does not constitute an offer or solicitation and nothing > contained in this email constitutes, and should not be construed as, > investment advice. Prospective investors should request offering >
Re: committing offsets
If you are using the console consumer to check the offsets topic, remember that you need this line in consumer.properties: exclude.internal.topics=false On Tue, Sep 22, 2015 at 6:05 AM Joris Peeters wrote: > Ah, nice! Does not look like it is working, though. For some reason the > __consumer_offsets topic is still empty. I see there's a few debug(..) > logging messages that might get displayed if things go wrong - would you > know how to get those displayed? (Right now I'm just running as 'java -jar > ...' with no logging config). > > -J > > -Original Message- > From: tao xiao [mailto:xiaotao...@gmail.com] > Sent: 22 September 2015 10:51 > To: users@kafka.apache.org > Subject: Re: committing offsets > > 0.8.2.1 already supports Kafka offset storage. You can set > offsets.storage=kafka in consumer properties and high level API is able to > pick it up and commit offsets to Kafka > > Here is the code reference where kafka offset logic kicks in > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L308 > > On Tue, 22 Sep 2015 at 17:44 Joris Peeters > wrote: > > > I'm trying to set up a kafka consumer (in Java) that uses the new > > approach of committing offsets (i.e. in the __consumer_offsets topic > > etc, rather than through zookeeper). > > > > Am I correct in believing that the current version (we're using kafka > > 8.2.1) does not expose this through the high level consumer yet? > > > > I am using kafka.consumer.Consumer.createJavaConsumerConnector to set > > up the consumer. I can play with the enabling/disabling of > > auto-commits, or use commitOffsets to commit manually, but that does > > not seem to lead to any entries in __consumer_offsets. Also, by > > following the code trail a bit, it does seem as if offset committing is > still happening through Zookeeper. > > > > (One reason we'd like to use the new system is because we're > > interested in monitoring it through Burrow) > > > > Apologies if I missed something simple. > > Cheers! > > -Joris. > > > > > > > > Winton Capital Management Limited ("Winton") is a limited company > > registered in England and Wales with its registered offices at 16 Old > > Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is > > authorised and regulated by the Financial Conduct Authority in the > > United Kingdom, registered as an investment adviser with the US > > Securities and Exchange Commission, registered with the US Commodity > > Futures Trading Commission and a member of the National Futures > > Association in the United States. > > > > This communication, including any attachments, is confidential and may > > be privileged. This email is for use by the intended recipient only. > > If you receive it in error, please notify the sender and delete it. > > You should not copy or disclose all or any part of this email. > > > > This email does not constitute an offer or solicitation and nothing > > contained in this email constitutes, and should not be construed as, > > investment advice. Prospective investors should request offering > > materials and consult their own advisers with respect to investment > > decisions and inform themselves as to applicable legal requirements, > > exchange control regulations and taxes in the countries of their > > citizenship, residence or domicile. Past performance is not indicative > of future results. > > > > Winton takes reasonable steps to ensure the accuracy and integrity of > > its communications, including emails. However Winton accepts no > > liability for any materials transmitted. Emails are not secure and > > cannot be guaranteed to be error free. > > > > > Winton Capital Management Limited (“Winton”) is a limited company > registered in England and Wales with its registered offices at 16 Old > Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is > authorised and regulated by the Financial Conduct Authority in the United > Kingdom, registered as an investment adviser with the US Securities and > Exchange Commission, registered with the US Commodity Futures Trading > Commission and a member of the National Futures Association in the United > States. > > This communication, including any attachments, is confidential and may be > privileged. This email is for use by the intended recipient only. If you > receive it in error, please notify the sender and delete it. You should not > copy or disclose all or any part of this email. > > This email does not constitute an offer or solicitation and nothing > contained in this email constitutes, and should not be construed as, > investment advice. Prospective investors should request offering materials > and consult their own advisers with respect to investment decisions and > inform themselves as to applicable legal requirements, exchange control > regulations and taxes in the countries of their citizenship, residence or > domicile. Past performance is not indicative of future resu
RE: committing offsets
Ah, nice! Does not look like it is working, though. For some reason the __consumer_offsets topic is still empty. I see there's a few debug(..) logging messages that might get displayed if things go wrong - would you know how to get those displayed? (Right now I'm just running as 'java -jar ...' with no logging config). -J -Original Message- From: tao xiao [mailto:xiaotao...@gmail.com] Sent: 22 September 2015 10:51 To: users@kafka.apache.org Subject: Re: committing offsets 0.8.2.1 already supports Kafka offset storage. You can set offsets.storage=kafka in consumer properties and high level API is able to pick it up and commit offsets to Kafka Here is the code reference where kafka offset logic kicks in https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L308 On Tue, 22 Sep 2015 at 17:44 Joris Peeters wrote: > I'm trying to set up a kafka consumer (in Java) that uses the new > approach of committing offsets (i.e. in the __consumer_offsets topic > etc, rather than through zookeeper). > > Am I correct in believing that the current version (we're using kafka > 8.2.1) does not expose this through the high level consumer yet? > > I am using kafka.consumer.Consumer.createJavaConsumerConnector to set > up the consumer. I can play with the enabling/disabling of > auto-commits, or use commitOffsets to commit manually, but that does > not seem to lead to any entries in __consumer_offsets. Also, by > following the code trail a bit, it does seem as if offset committing is still > happening through Zookeeper. > > (One reason we'd like to use the new system is because we're > interested in monitoring it through Burrow) > > Apologies if I missed something simple. > Cheers! > -Joris. > > > > Winton Capital Management Limited ("Winton") is a limited company > registered in England and Wales with its registered offices at 16 Old > Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is > authorised and regulated by the Financial Conduct Authority in the > United Kingdom, registered as an investment adviser with the US > Securities and Exchange Commission, registered with the US Commodity > Futures Trading Commission and a member of the National Futures > Association in the United States. > > This communication, including any attachments, is confidential and may > be privileged. This email is for use by the intended recipient only. > If you receive it in error, please notify the sender and delete it. > You should not copy or disclose all or any part of this email. > > This email does not constitute an offer or solicitation and nothing > contained in this email constitutes, and should not be construed as, > investment advice. Prospective investors should request offering > materials and consult their own advisers with respect to investment > decisions and inform themselves as to applicable legal requirements, > exchange control regulations and taxes in the countries of their > citizenship, residence or domicile. Past performance is not indicative of > future results. > > Winton takes reasonable steps to ensure the accuracy and integrity of > its communications, including emails. However Winton accepts no > liability for any materials transmitted. Emails are not secure and > cannot be guaranteed to be error free. > Winton Capital Management Limited (“Winton”) is a limited company registered in England and Wales with its registered offices at 16 Old Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is authorised and regulated by the Financial Conduct Authority in the United Kingdom, registered as an investment adviser with the US Securities and Exchange Commission, registered with the US Commodity Futures Trading Commission and a member of the National Futures Association in the United States. This communication, including any attachments, is confidential and may be privileged. This email is for use by the intended recipient only. If you receive it in error, please notify the sender and delete it. You should not copy or disclose all or any part of this email. This email does not constitute an offer or solicitation and nothing contained in this email constitutes, and should not be construed as, investment advice. Prospective investors should request offering materials and consult their own advisers with respect to investment decisions and inform themselves as to applicable legal requirements, exchange control regulations and taxes in the countries of their citizenship, residence or domicile. Past performance is not indicative of future results. Winton takes reasonable steps to ensure the accuracy and integrity of its communications, including emails. However Winton accepts no liability for any materials transmitted. Emails are not secure and cannot be guaranteed to be error free.
Re: committing offsets
0.8.2.1 already supports Kafka offset storage. You can set offsets.storage=kafka in consumer properties and high level API is able to pick it up and commit offsets to Kafka Here is the code reference where kafka offset logic kicks in https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L308 On Tue, 22 Sep 2015 at 17:44 Joris Peeters wrote: > I'm trying to set up a kafka consumer (in Java) that uses the new approach > of committing offsets (i.e. in the __consumer_offsets topic etc, rather > than through zookeeper). > > Am I correct in believing that the current version (we're using kafka > 8.2.1) does not expose this through the high level consumer yet? > > I am using kafka.consumer.Consumer.createJavaConsumerConnector to set up > the consumer. I can play with the enabling/disabling of auto-commits, or > use commitOffsets to commit manually, but that does not seem to lead to any > entries in __consumer_offsets. Also, by following the code trail a bit, it > does seem as if offset committing is still happening through Zookeeper. > > (One reason we'd like to use the new system is because we're interested in > monitoring it through Burrow) > > Apologies if I missed something simple. > Cheers! > -Joris. > > > > Winton Capital Management Limited ("Winton") is a limited company > registered in England and Wales with its registered offices at 16 Old > Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is > authorised and regulated by the Financial Conduct Authority in the United > Kingdom, registered as an investment adviser with the US Securities and > Exchange Commission, registered with the US Commodity Futures Trading > Commission and a member of the National Futures Association in the United > States. > > This communication, including any attachments, is confidential and may be > privileged. This email is for use by the intended recipient only. If you > receive it in error, please notify the sender and delete it. You should not > copy or disclose all or any part of this email. > > This email does not constitute an offer or solicitation and nothing > contained in this email constitutes, and should not be construed as, > investment advice. Prospective investors should request offering materials > and consult their own advisers with respect to investment decisions and > inform themselves as to applicable legal requirements, exchange control > regulations and taxes in the countries of their citizenship, residence or > domicile. Past performance is not indicative of future results. > > Winton takes reasonable steps to ensure the accuracy and integrity of its > communications, including emails. However Winton accepts no liability for > any materials transmitted. Emails are not secure and cannot be guaranteed > to be error free. >
committing offsets
I'm trying to set up a kafka consumer (in Java) that uses the new approach of committing offsets (i.e. in the __consumer_offsets topic etc, rather than through zookeeper). Am I correct in believing that the current version (we're using kafka 8.2.1) does not expose this through the high level consumer yet? I am using kafka.consumer.Consumer.createJavaConsumerConnector to set up the consumer. I can play with the enabling/disabling of auto-commits, or use commitOffsets to commit manually, but that does not seem to lead to any entries in __consumer_offsets. Also, by following the code trail a bit, it does seem as if offset committing is still happening through Zookeeper. (One reason we'd like to use the new system is because we're interested in monitoring it through Burrow) Apologies if I missed something simple. Cheers! -Joris. Winton Capital Management Limited ("Winton") is a limited company registered in England and Wales with its registered offices at 16 Old Bailey, London, EC4M 7EG (Registered Company No. 3311531). Winton is authorised and regulated by the Financial Conduct Authority in the United Kingdom, registered as an investment adviser with the US Securities and Exchange Commission, registered with the US Commodity Futures Trading Commission and a member of the National Futures Association in the United States. This communication, including any attachments, is confidential and may be privileged. This email is for use by the intended recipient only. If you receive it in error, please notify the sender and delete it. You should not copy or disclose all or any part of this email. This email does not constitute an offer or solicitation and nothing contained in this email constitutes, and should not be construed as, investment advice. Prospective investors should request offering materials and consult their own advisers with respect to investment decisions and inform themselves as to applicable legal requirements, exchange control regulations and taxes in the countries of their citizenship, residence or domicile. Past performance is not indicative of future results. Winton takes reasonable steps to ensure the accuracy and integrity of its communications, including emails. However Winton accepts no liability for any materials transmitted. Emails are not secure and cannot be guaranteed to be error free.
Kafka with MQQT
I am new to Internet Of things. I have pushed temperature data to mosquito server and successfully consumed all data. Now I want to push data from arduino to kafka server and consume from kafka. Is there any kafka library for arduino? What architecture will be suitable for scaling mqqt using apache kafka? Thanks