Re: Kafka rebalancing message lost
Rebalancing of partitions consumers does not necessarily mean loss of message. But I understand it can be annoying. If Kafka is rebalancing between consumers frequently, It means your consumer code is not polling within the expected timeout, as a result of which Kafka thinks the consumer is gone. You should tune your consumer implementation to keep the polling loop duration reasonable. See heartbeat.interval and session.timeout.ms configuration params in documentation. regards On Tue, Dec 18, 2018 at 3:34 AM Parth Gandhi < parth.gan...@excellenceinfonet.com> wrote: > Team, > We want to build a scalable kafka system for pub sub message and want to > run consumers (500+) on docker. We want the system to scale up the consumer > based on the message inflow. However in kafka this triggers a rebalancing > and we fear loss of message. > What is the best practices/way to achieve this with no or least message > failure? > > Disclaimer > > The information contained in this communication from the sender is > confidential. It is intended solely for use by the recipient and others > authorized to receive it. If you are not the recipient, you are hereby > notified that any disclosure, copying, distribution or taking action in > relation of the contents of this information is strictly prohibited and may > be unlawful. > > This email has been scanned for viruses and malware, and may have been > automatically archived by Mimecast Ltd, an innovator in Software as a > Service (SaaS) for business. Providing a safer and more useful place for > your human generated data. Specializing in; Security, archiving and > compliance. To find out more visit the Mimecast website. > -- http://khangaonkar.blogspot.com/
Re: How to pull the data from mobile app directly into Kafka broker
There is an open source Kafka Rest API from confluent. You could use that to POST to your broker. regards On Tue, Dec 4, 2018 at 5:32 AM Satendra Pratap Singh wrote: > Hi Team, > > Can you help me out in general I wanted to know I developed an app and > installed in my mobile when I logged in or do any activity mobile generate > a log j want to pull that logs directly from mobile to Kafka broker. > > Please let me know how to do it. > How to build data pipeline. -- http://khangaonkar.blogspot.com/
Re: Kafka ingestion data not equally distribute among brokers
Hi That is part of the Producer API. For Java see https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html ProducerRecord regards On Tue, Nov 6, 2018 at 5:46 PM Shiuh Rong Yong < shiuhrongy...@cirosolution.com> wrote: > Hi Manoj, > > I am new on Kafka, is it something mentioned on this site? > https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/state/KeyValueStore.html > > Where can we set the key and value in Kafka? > > Thanks! > > > - Original Message - > From: "Manoj Khangaonkar" > To: "users" > Sent: Tuesday, November 6, 2018 4:36:40 AM > Subject: Re: Kafka ingestion data not equally distribute among brokers > > Hi > > In Kafka topic data is split into partitions. Partitions are assigned to > brokers. > > I assume what you are trying to says is that the distribution of messages > across partitions is not balanced. > > Messages are written to topics as key,value. The messages are distributed > across partitions based on key. > > Perhaps you need to pick a better key that gives balanced distribution. > > regards > > > > On Mon, Nov 5, 2018 at 9:22 AM yong wrote: > > > Hi Kafka experts, > > > > We have a 3 nodes kafka setup with data streams from mysql binlog, we > > noticed that from grafana monitoring in and out, some broker nodes are > > having lower data ingested in and out, is there anyway to balance it? Or > it > > is nature of Kafka? > > > > Thanks! > > > > > -- > http://khangaonkar.blogspot.com/ > -- http://khangaonkar.blogspot.com/
Re: Kafka ingestion data not equally distribute among brokers
Hi In Kafka topic data is split into partitions. Partitions are assigned to brokers. I assume what you are trying to says is that the distribution of messages across partitions is not balanced. Messages are written to topics as key,value. The messages are distributed across partitions based on key. Perhaps you need to pick a better key that gives balanced distribution. regards On Mon, Nov 5, 2018 at 9:22 AM yong wrote: > Hi Kafka experts, > > We have a 3 nodes kafka setup with data streams from mysql binlog, we > noticed that from grafana monitoring in and out, some broker nodes are > having lower data ingested in and out, is there anyway to balance it? Or it > is nature of Kafka? > > Thanks! > -- http://khangaonkar.blogspot.com/
Re: Consumer Pause & Scheduled Resume
Hi Pradeep The poll , pause and resume need to happen in the same thread -- in the same while loop. If a scheduler is the trigger for pause or resume, do not call pause /resume from the scheduler thread. Instead set a variable in the class that has the poll loop. The poll loop can check the variable and pause/resume as necessary. For the rebalance scenario , you should implement the ConsumerRebalanceListener interface and register it with the consumer. It will get called when paritions are assigned or revoked. There you can call pause or resume again Hope this helps regards On Thu, Oct 25, 2018 at 6:11 PM pradeep s wrote: > Hi Manoj/Matthias, > My requirement is that to run the consumer daily once , stream the messages > and pause when i am encountering a few empty fetches . > I am planning to run two consumers and pausing the consumption based on > the empty fetches for a topic with 4 partitions . > To avoid the consumer multi thread access issue , i am running consumer, > exit the poll loop, and calling pause on the same thread. In this case , i > will not continuously polling . > When the next schedule kicks in , i will resume the polling . > Will the consumer resume call cause issues ,since the schedule loop is > trigger long time after the polling stopped .(Or the old approach of > continuous polling is the correct one) > Also ,Manoj, can you please explain on the rebalance scenario if the > consumer is paused for two partitions and gets the assignment for another > two partitions (because of a pod termination), how can i pause the > consumption if its not the scheduled time to process the records. > Thanks > Pradeep > > On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar > wrote: > > > One item to be aware with pause and resume - is that it applies to > > partitions currently assigned to the consumer. > > > > But partitions can get revoked or additional partitions can get assigned > to > > consumer. > > > > With reassigned , you might be expecting the consumer to be paused but > > suddenly start getting messages because a new partition got assigned. > > > > Use the RebalanceListener to pause or resume any new partitions > > > > regards > > > > On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax > > wrote: > > > > > That is correct: clients are not thread safe. > > > > > > You can use an `AtomicBoolean needToResume` that you share over both > > > threads and that is initially false. > > > > > > In your scheduled method, you set the variable to true. > > > > > > In your main consumer, each time before you call poll(), you check if > > > the variable is set to true. If yes, you resume() and reset the > variable > > > to false. > > > > > > Hope this helps. > > > > > > -Matthias > > > > > > > > > On 10/25/18 2:09 PM, pradeep s wrote: > > > > Thanks Matthias. I am facing the issue when i am trying to call the > > > resume > > > > from the scheduled method . > > > > Was getting exception that Kafka Consumer is not safe for multi > > threaded > > > > access . I am trying to see how can call pause and resume on the same > > > > thread. There will be only one thread running for consumption. > > > > > > > > > > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> There is no issue if you call `poll()` is all partitions are paused. > > If > > > >> fact, if you want to make sure that the consumer does not fall out > of > > > >> the consumer group, you must call `poll()` in regular interval to > not > > > >> hit `max.poll.interval.ms` timeout. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> On 10/24/18 10:25 AM, pradeep s wrote: > > > >>> Pause and resume is required since i am running a pod in kubernetes > > > and i > > > >>> am not shutting down the app > > > >>> > > > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < > > > sreekumar.prad...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Hi, > > > >>>> I have a requirement to have kafka streaming start at scheduled > time > > > and > > > >>>> then pause the stream when the consumer poll returns empty fetches > > for > > >
Re: Consumer Pause & Scheduled Resume
One item to be aware with pause and resume - is that it applies to partitions currently assigned to the consumer. But partitions can get revoked or additional partitions can get assigned to consumer. With reassigned , you might be expecting the consumer to be paused but suddenly start getting messages because a new partition got assigned. Use the RebalanceListener to pause or resume any new partitions regards On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax wrote: > That is correct: clients are not thread safe. > > You can use an `AtomicBoolean needToResume` that you share over both > threads and that is initially false. > > In your scheduled method, you set the variable to true. > > In your main consumer, each time before you call poll(), you check if > the variable is set to true. If yes, you resume() and reset the variable > to false. > > Hope this helps. > > -Matthias > > > On 10/25/18 2:09 PM, pradeep s wrote: > > Thanks Matthias. I am facing the issue when i am trying to call the > resume > > from the scheduled method . > > Was getting exception that Kafka Consumer is not safe for multi threaded > > access . I am trying to see how can call pause and resume on the same > > thread. There will be only one thread running for consumption. > > > > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax > > wrote: > > > >> There is no issue if you call `poll()` is all partitions are paused. If > >> fact, if you want to make sure that the consumer does not fall out of > >> the consumer group, you must call `poll()` in regular interval to not > >> hit `max.poll.interval.ms` timeout. > >> > >> > >> -Matthias > >> > >> On 10/24/18 10:25 AM, pradeep s wrote: > >>> Pause and resume is required since i am running a pod in kubernetes > and i > >>> am not shutting down the app > >>> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < > sreekumar.prad...@gmail.com> > >>> wrote: > >>> > Hi, > I have a requirement to have kafka streaming start at scheduled time > and > then pause the stream when the consumer poll returns empty fetches for > >> 3 or > more polls. > > I am starting a consumer poll loop during application startup using a > singled thread executor and then pausing the consumer when the poll is > returning empty for 3 polls. > > When the schedule kicks in , i am calling *consumer.resume.* > > Is this approach correct ? > Will it cause any issue If the consumer calls poll on a paused > >> consumer. > > Skeleton Code > > > public class *OfferItemImageConsumer* implements Runnable { > > @Override > public void run() { > try { > do { > ConsumerRecords records = > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > writeAndPauseEmptyFetch(records); > processRecords(records); > } while (!consumerLoopClosed.get()); > } catch (RuntimeException ex) { > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > } > > > private void writeAndPauseEmptyFetch(ConsumerRecords > >> records) { > if (records.isEmpty()) { > emptyFetchCount++; > } > if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > writeImageData(); > emptyFetchCount = 0; > kafkaConsumer.pause(kafkaConsumer.assignment()); > consumerPaused = true; > } > } > > } > > = > > public class *ItemImageStreamScheduler* { > private static final int TERMINATION_TIMEOUT = 10; > > > private ExecutorService executorService = > >> Executors.newSingleThreadExecutor(); > > private final OfferItemImageConsumer offerItemImageConsumer; > private final ItemImageStreamConfig itemImageStreamConfig; > private final KafkaConsumer kafkaConsumer; > > @EventListener(ApplicationReadyEvent.class) > void startStreaming() { > executorService.submit(offerItemImageConsumer); > } > @Scheduled > void resumeStreaming() { > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > > > } > > Thanks > > Pradeep > > > >>> > >> > >> > > > > -- http://khangaonkar.blogspot.com/
Re: Kafka consumer producer logs
Producer and consumer logs would be in the respective client applications. To enable them , you would enable them for the kafka packages in the client application. For example if you were using log4j , you would add something like org.apache.kafka.clients=INFO regards On Tue, Oct 16, 2018 at 6:18 AM Kaushik Nambiar wrote: > Hello, > I am using a Kafka with version 0.11.x.x. > We are producing and consuming messages via different applications. > But due to some reason,we are able to produce the messages on a particular > topic but not able to consume them. > The issue is for a specific topic,for the others m able to produce and > consume successfully > Could there be any specific reasons for this issue? > > Also in the logs I can mostly see Kafka state change and controller logs. > > So my question is,are Kafka consumer producer logs stored in the Kafka > server or they would be stored at the client applications? > If they are not stored in the server is there any way by which we can > enable the same? > > Thank you, > Kaushik Nambiar > -- http://khangaonkar.blogspot.com/
Re: Optimal Message Size for Kafka
Best kafka performance is with messages size in the order of a few KB. Larger messages put heavy load on brokers and is very inefficient. It is inefficient on producers and consumers as well regards On Thu, Sep 6, 2018 at 11:16 PM SenthilKumar K wrote: > Hello Experts, We are planning to use Kafka for large message set ( size > various from 2 MB to 4 MB per event ). By setting message.max.bytes to 64 > MB value this Kafka Producer allows large messages. But how does it > impacts performance (both producer and consumer)? > > Would like to understand the performance impact of different message size > . Example : producing 50KB vs 1MB . > > What is the optimal message size can be used in Kafka Producer ? > > --Senthil > -- http://khangaonkar.blogspot.com/
Re: Looking for help with a question on the consumer API
Hi, Yes , if you don'nt call poll within the configured timeouts, the broker thinks the consumer is gone. But increasing the timeout is not a sustainable design. In general the code in the consumer poll loop should be very fast and do minimal work. Any heavy duty work should be done by handing of to a thread pool. What is happening is that the partitions for this consumer are assigned to another. If reading too many messages too fast becomes an issue, You can consider using the pause and resume API to pause the consumer. In some special cases, you might consider manually assigning partitions to the consumer. regards On Wed, Aug 8, 2018 at 1:30 PM, Moiz Raja (moraja) wrote: > Hi All, > > I have an issue with the consumer getting kicked out of the group possibly > due to other issues going on in the system. The issue is detailed here > https://stackoverflow.com/questions/51754794/how-to- > reinstate-a-kafka-consumer-which-has-been-kicked-out-of-the-group > > Any help with this issue would be appreciated. > > Regards, > -Moiz > -- http://khangaonkar.blogspot.com/
KafkaConsumer pause method not working as expected
Hi, I am implementing flow control by (1) using the pause(partitions) method on the consumer to stop consumer.poll from returning messages. (2) using the resume(partitions) method on the consumer to let consumer.poll return messages This works well for a while. Several sets of pause-resume work as expected. But after a while I am seeing that the paused consumer starts returning messages WITHOUT my code calling resume. There are no changes in the partitions assigned to consumer. I know this because I logs the assigned partitions after every consumer.poll(...). That should rule out a rebalance. Is there something that I am missing here ? Is there something more required to pause a consumer from retrieving message ? regards -- http://khangaonkar.blogspot.com/
Re: Kafka consumer commit behaviour on rebalance
Yes. If a consumer when down, all the polled messages that were not committed will be redelivered to another consumer. regards On Tue, Feb 13, 2018 at 9:31 AM, pradeep s wrote: > Hi All, > I am running a Kafka consumer(Single threaded) on kubernetes . Application > is polling the records and accummulating in memory . There is a scheduled > write of these records to S3 . Only after that i am committing the offsets > back to Kafka. > There are 4 partitions and 4 consumers(4 kubernetes pods) are there in the > group. > > I have a question on the commit behaviour when kafka rebalancing happens . > After a kafka rebalance , will the consumers get reassigned and get > duplicate records . > Do i need to clear my in memory buffer on a rebalance event > > Thanks > Pradeep > -- http://khangaonkar.blogspot.com/
Re: can't feed remote broker with producer demo
In your server.properties , in either the listeners or advertised.listeners , replace the localhost with the ip address. regards On Mon, Jan 22, 2018 at 7:16 AM, Rotem Jacobi wrote: > Hi, > When running the quickstart guide (producer, broker (with zookeeper) and > consumer on the same machine) it works perfectly. > When trying to run the producer from another machine to feed the broker > I'm getting an error message: > > C:\Development\kafka_2.11-0.11.0.0\bin\windows>kafka-console-producer.bat > --broker-list 172.16.40.125:9092 --topic test > >test_message > >[2018-01-22 17:14:44,240] ERROR Error when sending message to topic test > with key: null, value: 12 bytes with error: (org.apache.kafka.clients. > producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > test-0: 2298 ms has passed since batch creation plus linger time > Any idea? > Is there any broker configuration that I'm missing here? > > Thanks, > Rotem. > -- http://khangaonkar.blogspot.com/
Re: Capturing and storing these Kafka events for query.
Hi, If I understood the question correctly , then the better approach is to consume events from topic and store in your favorite database. Then query the database as needed. Querying the topic for messages in kafka is not recommended as that will be a linear search. regards On Thu, Jan 11, 2018 at 8:55 AM, Maria Pilar wrote: > Hi all, > > I have a requirement to be able to capture and store events for query, > and I'm trying to choose the best option for that: > > 1) Capture the events from a separate topic, store events a state, in > order to convert a stream to a table, that means materializing the > stream. > The option for it, that I'm thinking is to use an external state that > is maintained in an external datastore, often a NoSQL (e.g Cassandra). > That solution provides the advantage unlimited size and it can be > accessed from multiple instances of the application or from different > applications. > > 2) Other options could be query direct in local o internal state or in > memory, however, to be able to query directly onto a topic, I would > need to search a record by partition and offset, I don't know exactly > how to implement that option if it's possible. > > Cheers. > -- http://khangaonkar.blogspot.com/
Re: kafka configure problem
The advertised.listeners property in server.properties needs to have the ip address that the broker is listening on. For eg. advertised.listeners=PLAINTEXT://192.168.52.194:9092:9092 regards On Mon, Dec 25, 2017 at 12:20 AM, 刘闯 wrote: > i‘ m a beginner of kafka. i usde kafka_2.11-0.9.0.1. my kafka built in > two virtual machinewhich ip are 52.193 and 52.194 > > when i use commonds both in 52.193 > ./kafka-console-producer.sh --broker-list 192.168.52.193:9092 --topic > shuaige > and > ./kafka-console-consumer.sh --zookeeper 192.168.52.195:2181 --topic > shuaige --from-beginning > > the message of producer was shown in consumer terminal. > > but when i use 52.193 as producer and 52.194 as consumer > ./kafka-console-producer.sh --broker-list 192.168.52.194:9092 --topic > shuaige > and > ./kafka-console-consumer.sh --zookeeper 192.168.52.195:2181 --topic > shuaige --from-beginning > > it shows WARN Error while fetching metadata with correlation id 0 : > {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2017-12-25 15:22:19,571] WARN Error while fetching metadata with > correlation id 1 : {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-12-25 15:22:19,675] WARN Error while fetching metadata with > correlation id 2 : {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > > here is my configure,server.properties > # The id of the broker. This must be set to a unique integer for each > broker. > broker.id=0 > > # Socket Server Settings > # > > listeners=PLAINTEXT://:9092 > port=9092 > # The port the socket server listens on > #port=9092 > > # Hostname the broker will bind to. If not set, the server will bind to > all interfaces > #host.name=localhost > host.name=192.168.52.193 > # Hostname the broker will advertise to producers and consumers. If not > set, it uses the > # value for "host.name" if configured. Otherwise, it will use the value > returned from > # java.net.InetAddress.getCanonicalHostName(). > #advertised.host.name= > advertised.host.name = localhost > # The port to publish to ZooKeeper for clients to use. If this is not set, > # it will publish the same port that the broker binds to. > #advertised.port= > advertised.port=9092 > # The number of threads handling network requests > num.network.threads=3 > > # The number of threads doing disk I/O > num.io.threads=8 > > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=102400 > > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=102400 > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > > > # Log Basics # > > # A comma seperated list of directories under which to store log files > log.dirs=/tmp/kafka-logs > > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files > across > # the brokers. > num.partitions=1 > > # The number of threads per data directory to be used for log recovery at > startup and flushing at shutdown. > # This value is recommended to be increased for installations with data > dirs located in RAID array. > num.recovery.threads.per.data.dir=1 > log.retention.hours=168 > message.max.byte=5242880 > default.replication.factor=2 > replica.fetch.max.bytes=5242880 > # A size-based retention policy for logs. Segments are pruned from the log > as long as the remaining > # segments don't drop below log.retention.bytes. > #log.retention.bytes=1073741824 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=1073741824 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > > # Zookeeper # > > # Zookeeper connection string (see zookeeper docs for details). > # This is a comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". > # You can also append an optional chroot string to the urls to specify the > # root directory for all kafka znodes. > #zookeeper.connect=localhost:2181 > zookeeper.connect=192.168.52.195:2181,192.168.52.196:2181,1 > 92.168.52.197:2181 > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > > > > > ..thanks for you help > > -- http://khangaonkar.blogspot.com/
Re: kafka configure problem
The advertised.listeners property in server.properties needs to have the ip address that the broker is listening on. For eg. advertised.listeners=PLAINTEXT://192.168.52.194:9092:9092 regards On Mon, Dec 25, 2017 at 12:20 AM, 刘闯 wrote: > i‘ m a beginner of kafka. i usde kafka_2.11-0.9.0.1. my kafka built in > two virtual machinewhich ip are 52.193 and 52.194 > > when i use commonds both in 52.193 > ./kafka-console-producer.sh --broker-list 192.168.52.193:9092 --topic > shuaige > and > ./kafka-console-consumer.sh --zookeeper 192.168.52.195:2181 --topic > shuaige --from-beginning > > the message of producer was shown in consumer terminal. > > but when i use 52.193 as producer and 52.194 as consumer > ./kafka-console-producer.sh --broker-list 192.168.52.194:9092 --topic > shuaige > and > ./kafka-console-consumer.sh --zookeeper 192.168.52.195:2181 --topic > shuaige --from-beginning > > it shows WARN Error while fetching metadata with correlation id 0 : > {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) > [2017-12-25 15:22:19,571] WARN Error while fetching metadata with > correlation id 1 : {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > [2017-12-25 15:22:19,675] WARN Error while fetching metadata with > correlation id 2 : {ttes=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients. > NetworkClient) > > here is my configure,server.properties > # The id of the broker. This must be set to a unique integer for each > broker. > broker.id=0 > > # Socket Server Settings > # > > listeners=PLAINTEXT://:9092 > port=9092 > # The port the socket server listens on > #port=9092 > > # Hostname the broker will bind to. If not set, the server will bind to > all interfaces > #host.name=localhost > host.name=192.168.52.193 > # Hostname the broker will advertise to producers and consumers. If not > set, it uses the > # value for "host.name" if configured. Otherwise, it will use the value > returned from > # java.net.InetAddress.getCanonicalHostName(). > #advertised.host.name= > advertised.host.name = localhost > # The port to publish to ZooKeeper for clients to use. If this is not set, > # it will publish the same port that the broker binds to. > #advertised.port= > advertised.port=9092 > # The number of threads handling network requests > num.network.threads=3 > > # The number of threads doing disk I/O > num.io.threads=8 > > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=102400 > > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=102400 > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > > > # Log Basics # > > # A comma seperated list of directories under which to store log files > log.dirs=/tmp/kafka-logs > > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files > across > # the brokers. > num.partitions=1 > > # The number of threads per data directory to be used for log recovery at > startup and flushing at shutdown. > # This value is recommended to be increased for installations with data > dirs located in RAID array. > num.recovery.threads.per.data.dir=1 > log.retention.hours=168 > message.max.byte=5242880 > default.replication.factor=2 > replica.fetch.max.bytes=5242880 > # A size-based retention policy for logs. Segments are pruned from the log > as long as the remaining > # segments don't drop below log.retention.bytes. > #log.retention.bytes=1073741824 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=1073741824 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=30 > > # Zookeeper # > > # Zookeeper connection string (see zookeeper docs for details). > # This is a comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". > # You can also append an optional chroot string to the urls to specify the > # root directory for all kafka znodes. > #zookeeper.connect=localhost:2181 > zookeeper.connect=192.168.52.195:2181,192.168.52.196:2181,1 > 92.168.52.197:2181 > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=6000 > > > > > ..thanks for you help > > -- http://khangaonkar.blogspot.com/
Re: Seeking advice on Kafka Streams and Kafka Connect
Hi I am not a big fan of kafka connect. I had use case for kafka messages that needed to be written to MongoDb. The available third party connectors were less than ideal. To me a well written Kafka consumer is simpler and better longer term solution instead of an additional moving part and additional programming model of Kafka connect. Keep it simple with topics , producers and consumers. regards On Thu, Dec 21, 2017 at 2:49 AM, Mads Tandrup < mads.tand...@schneider-electric.com> wrote: > Hi > > Sorry for the simple question. I’m just starting to learn about Kafka > streams and connect and I’m struggling to understand the exact difference > and which one to use. I’m coming from Apache Storm so forgive me if I make > false assumptions. > > I have a use case where I have a Kafka topic with some messages. What I > need to do: > 1. Read the messages > 2. Split and map the message into a number of rows > 3. Write the rows to Cassandra > > It seems the first 2 steps are a natural fit for Kafka Streams. But it > seems the way to write to Cassandra is to use Kafka Connect. > Is that correctly understood? > > Is there any way to connect Kafka Streams and Kafka Connect without > writing it to a new kafka topic? Since the transformation in step 2 is so > simple it seems a waste to write it to disk. > > Is there any other way I should consider? > > Best regards, > Mads > > -- http://khangaonkar.blogspot.com/
Re: How to get the start and end of partition from kafka
Hi Not sure of adminClient but if you are programming is Java this should be possible by using KafkaConsumer class org.apache.kafka.clients.consumer.KafkaConsumer It has beginningOffsets and endOffSets methods , that can give you the information. regards On Thu, Dec 14, 2017 at 11:10 PM, 懒羊羊 <1227439...@qq.com> wrote: > Hello, I want to get the start and end of partition form kafka by API, > such as:AdminClient. How can I do? -- http://khangaonkar.blogspot.com/
Re: Installing and Running Kafka
Hi Did you download the binary download or are you trying to build the source code and then run ? With binary downloads, I have never had an issue. Another possibility is you have scala installed that is getting in the way. regards On Fri, Dec 15, 2017 at 1:54 PM, Karl Keller wrote: > Hello, > > I’ve been trying most of the afternoon to get Kafka installed and running > the basic quick start. > > I am running into the following errors related to firing up zookeeper. > From the kafka directory: > > Andreas-iMac:kafka_2.11-1.0.0 Andrea$ bin/zookeeper-server-start.sh > config/zookeeper.properties > usage: dirname path > Classpath is empty. Please build the project first e.g. by running > './gradlew jar -Pscala_version=2.11.11’ > > Then I went ahead and moved into the gradle directory, created the gradle > wrapper and then executed the suggested build command show below. I’m > stuck on the absence of the “jar” not being in the root project. > > Could you suggest a next step? Thanks - Karl > > Andreas-iMac:4.4 Andrea$ ./gradlew jar -Pscala_version=2.11.11 > > FAILURE: Build failed with an exception. > > * What went wrong: > Task 'jar' not found in root project '4.4'. > > * Try: > Run gradlew tasks to get a list of available tasks. Run with --stacktrace > option to get the stack trace. Run with --info or --debug option to get > more log output. Run with --scan to get full insights. > > * Get more help at https://help.gradle.org > > BUILD FAILED in 1s > -- http://khangaonkar.blogspot.com/
Re: scaling kafka brokers
see the link http://kafka.apache.org/documentation.html#basic_ops_cluster_expansion After you add the new broker , you have to run the partition assignment tool to re assign partitions regards On Thu, May 21, 2015 at 3:55 PM, Dillian Murphey wrote: > What's out there in terms of auto scaling kafka brokers? > > What is being done? Does anyone run an elastic kafka broker cluster? > > Along these lines, what is the procedure to replace a broker with a new > broker? This might be ensuring topics are replicated so I can afford > downtime. But let's say I kill a broker and spin up an entirely new one... > How do I distribute topics to this new broker?? > > Thanks for any sources of information > -- http://khangaonkar.blogspot.com/
Re: Optimal number of partitions for topic
With knowing the actual implementation details, I would get guess more partitions implies more parallelism, more concurrency, more threads, more files to write to - all of which will contribute to more CPU load. Partitions allow you to scale by partitioning the topic across multiple brokers. Partition is also a unit of replication ( 1 leader + replicas ). And for consumption of messages, the order is maintained within a partitions. But if you put 100 partitions per topic on 1 single broker, I wonder if it is going to be an overhead. On Wed, May 20, 2015 at 1:02 AM, Carles Sistare wrote: > Hi, > We are implementing a Kafka cluster with 9 brokers into EC2 instances, and > we are trying to find out the optimal number of partitions for our topics, > finding out the maximal number in order not to update the partition number > anymore. > What we understood is that the number of partitions shouldn’t affect the > CPU load of the brokers, but when we add 512 partitions instead of 128, for > instance, the CPU load exploses. > We have three topics with 10 messages/sec each, a replication factor > of 3 and two consumer groups for each partition. > > Could somebody explain, why the increase of the number of partitions has a > so dramatic impact to the CPU load? > > > Here under i paste the config file of kafka: > > broker.id=3 > > default.replication.factor=3 > > > # The port the socket server listens on > port=9092 > > # The number of threads handling network requests > num.network.threads=2 > > # The number of threads doing disk I/O > num.io.threads=8 > > # The send buffer (SO_SNDBUF) used by the socket server > socket.send.buffer.bytes=1048576 > > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=1048576 > > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > > > > # A comma seperated list of directories under which to store log files > log.dirs=/mnt/kafka-logs > > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files > across > # the brokers. > num.partitions=16 > > # The minimum age of a log file to be eligible for deletion > log.retention.hours=1 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=536870912 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=6 > > # By default the log cleaner is disabled and the log retention policy will > default to just delete segments after their retention expires. > # If log.cleaner.enable=true is set the cleaner will be enabled and > individual logs can then be marked for log compaction. > log.cleaner.enable=false > > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=100 > > auto.leader.rebalance.enable=true > controlled.shutdown.enable=true > > > Thanks in advance. > > > > Carles Sistare > > > -- http://khangaonkar.blogspot.com/
Re: Consumers in a different datacenters
You can use MirrorMaker to mirror the topic from cluster A to cluster B. This has the benefit of avoiding consumers in B connecting to A which could have some latency regards On Tue, May 19, 2015 at 11:46 AM, Bill Hastings wrote: > Hi All > > Has anyone tried this? We have two data centers A and B. We would like data > replicated between A and B. So I would like to have a kafka cluster set up > in A and B. When we need to replicate from A-->B I would like the app in A > publish a topic to the kafla cluster in data center A. The corresponding > consumers are in data center B. When they get the topic they apply locally. > Does this sound like a viable solution? > > -- > Cheers > Bill > -- http://khangaonkar.blogspot.com/
Re: Differences between new and legacy scala producer API
On Thu, May 7, 2015 at 10:01 PM, Rendy Bambang Junior < rendy.b.jun...@gmail.com> wrote: > Hi > > - Legacy scala api for producer is having keyed message along with topic, > key, partkey, and message. Meanwhile new api has no partkey. Whats the > difference between key and partkey? > In the new API, key is encapsulated in the ProducerRecord. regards -- http://khangaonkar.blogspot.com/
Re: Could you answer the following kafka stackoverflow question?
Use the Simple Consumer API if you want to control which offset in a partition the client should read messages from. Use the High level consumer, if you don'nt care about offsets. Offsets are per partition and stored on zookeeper. regards On Tue, Apr 28, 2015 at 4:38 AM, Gomathivinayagam Muthuvinayagam < sankarm...@gmail.com> wrote: > I have just posted the following question in stackoverflow. Could you > answer the following questions? > > I would like to use Kafka high level consumer API, and at the same time I > would like to disable auto commit of offsets. I tried to achieve this > through the following steps. > > 1) auto.commit.enable = false2) offsets.storage = kafka3) > dual.commit.enabled = false > > I created a offset manager, which periodically creates offsetcommit request > to kafka and commits the offset. > > Still I have the following questions > > 1) Does high level consumer API automatically fetches offset from kafka > storage and initializes itself with that offset? Or should I use simple > consumer API to achieve this? > > 2) Does kafka based storage for offsets is repicated across all brokers? Or > it is maintained on only one broker? > > > http://stackoverflow.com/questions/29909179/kafka-offsetcommit-request-with-high-level-consumer-api > Thanks & Regards, > -- http://khangaonkar.blogspot.com/
Re: Regarding key to b sent as part of producer message Please help
Hi Your key seems to be String. key.serializer.class might need to be set to StringEncoder. regards On Sat, Apr 25, 2015 at 10:43 AM, Gaurav Agarwal wrote: > Hello > > I am sending message from producer like this with DefaultEncoder. > > KeyedMessage keyedMessage = new KeyedMessage byte[]>("topic",Serializations.serialize("s"), > > Serializations.getSerialized(msg,rqst)); > > This is a compile time error at java level as it expects String > > > But if i use > > > KeyedMessage keyedMessage = new KeyedMessage byte[]>("topic","s",Serializations.getSerialized(msg,rqst)); > > > as part of sending message , it gives me class cast exception as i have > configured DefaultEncoder and sending byte Message. > -- http://khangaonkar.blogspot.com/
Re: Fetch API Offset
Hi, I have used code like FetchRequest req = new FetchRequestBuilder() .clientId(clientname) .addFetch(topic, partition, offset_in_partition, 10).build(); FetchResponse fetchResponse = consumer.fetch(req); And it returns the message that I was looking for. The offset_in_partion is a value I get by calling offset_in_partition = messageAndOffset.nextOffset(); // call this on a previous message and saving it somewhere for future use. The only caveat is that fetch returns a block of messages which means I might get messages with lower offset that I just need to ignore. regards On Tue, Apr 21, 2015 at 6:47 AM, Piotr Husiatyński wrote: > What do you mean that message has expired? > > If I will do fetch request with offset 4, I will receive first message > with offset 5. I think fetch is always excluding requested offset, > while documentation is saying it will be included in response. > > On Tue, Apr 21, 2015 at 3:35 PM, Manoj Khangaonkar > wrote: > > Hi, > > > > I suspect If some message from the given offset have expired, then they > > will not be returned. > > > > regards > > > > On Tue, Apr 21, 2015 at 5:14 AM, Piotr Husiatyński > wrote: > > > >> According to documentation, sending fetch request with offset value > >> result in messages starting with given offset (including) and greater. > >> > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI > >> > >> > Logically one specifies the topics, partitions, and starting offset at > >> which to begin the fetch and gets back a chunk of messages. In general, > the > >> return messages will have offsets larger than or equal to the starting > >> offset > >> > >> I'm sending fetch request with offset 5 and first message I'm getting > >> is offset 6. Am I doing something wrong or documentation has to be > >> fixed? > >> > > > > > > > > -- > > http://khangaonkar.blogspot.com/ > -- http://khangaonkar.blogspot.com/
Re: Fetch API Offset
Hi, I suspect If some message from the given offset have expired, then they will not be returned. regards On Tue, Apr 21, 2015 at 5:14 AM, Piotr Husiatyński wrote: > According to documentation, sending fetch request with offset value > result in messages starting with given offset (including) and greater. > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI > > > Logically one specifies the topics, partitions, and starting offset at > which to begin the fetch and gets back a chunk of messages. In general, the > return messages will have offsets larger than or equal to the starting > offset > > I'm sending fetch request with offset 5 and first message I'm getting > is offset 6. Am I doing something wrong or documentation has to be > fixed? > -- http://khangaonkar.blogspot.com/
Re: SimpleConsumer.getOffsetsBefore problem
Hi, Earliest and Latest are like Enums that denote the first and last messages in the partition. (Or the offsets for those positions) My understanding is the you can only based on offsets . Not on timestamps. regards On Thu, Apr 16, 2015 at 7:35 AM, Alexey Borschenko < aborsche...@elance-odesk.com> wrote: > Hi all! > > I need to read offsets closing to specified timestamp. > As I can see this can be achieved by using SImpleConsumer API. > To test things I use SimpleConsumer example provided on site: > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > I use Kafka 8.2.1 > > When I pass -1 or -2 (latest or earliest) as time to getOffsetsBefore - it > worx fine: returns 1562 and 73794 accordingly > When I pass System.currentTimeMillis() as time: returns 1562 - same as > earliest > When I pass System.currentTimeMillis() - 10*60*1000 - it returns 1562 > When I pass System.currentTimeMillis() - 200*60*1000 - it returns 0 > > Here is code snippet: > > * > > public void run(long a_maxReads, String a_topic, int a_partition, > List a_seedBrokers, int a_port) throws Exception { > System.out.println("a_maxReads = [" + a_maxReads + "], a_topic = > [" + a_topic + "], a_partition = [" + a_partition + "], a_seedBrokers > = [" + a_seedBrokers + "], a_port = [" + a_port + "]"); > // find the meta data about the topic and partition we are interested > in > // > PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, > a_topic, a_partition); > if (metadata == null) { > System.out.println("Can't find metadata for Topic and > Partition. Exiting"); > return; > } > if (metadata.leader() == null) { > System.out.println("Can't find Leader for Topic and Partition. > Exiting"); > return; > } > String leadBroker = metadata.leader().host(); > String clientName = "Client_" + a_topic + "_" + a_partition; > > SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, > 10, 64 * 1024, clientName); > long readOffset = getLastOffset( > consumer, a_topic, a_partition, > System.currentTimeMillis() - 10 * 60 * 1000, > clientName); > System.out.println("readOffset = " + readOffset); > } > > public static long getLastOffset(SimpleConsumer consumer, String > topic, int partition, > long whichTime, String clientName) { > System.out.println("consumer = [" + consumer + "], topic = [" + > topic + "], partition = [" + partition + "], whichTime = [" + > whichTime + "], clientName = [" + clientName + "]"); > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > partition); > Map requestInfo = > new HashMap(); > requestInfo.put(topicAndPartition, new > PartitionOffsetRequestInfo(whichTime, 1)); > kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( > requestInfo, kafka.api.OffsetRequest.CurrentVersion(), > clientName); > OffsetResponse response = consumer.getOffsetsBefore(request); > > if (response.hasError()) { > System.out.println("Error fetching data Offset Data the > Broker. Reason: " + response.errorCode(topic, partition)); > return 0; > } > long[] offsets = response.offsets(topic, partition); > if (offsets.length == 0) { > return 0; > } > return offsets[0]; > } > > > > How can I get more or less accurate offset values close to specified > timestamp? > > Thanx! > -- http://khangaonkar.blogspot.com/
Re: Design questions related to kafka
I meant you can read messages multiple times if you want to. Yes, you would store offset and request reading from an offset with Simple Consumer API to implement once and only once delivery. regards On Wed, Apr 15, 2015 at 10:55 AM, Pete Wright wrote: > > > On 04/15/15 09:31, Manoj Khangaonkar wrote: > > # I looked the documents of kafka and I see that there is no way a > >> consume instance can > >>read specific messages from partition. > >> > > > > With Kafka you read messages from the beginning multiple times. Since you > > say later that > > you do not have many messages per topic, you can iterate over the message > > and read the ones > > that you need. Of course this might not the be the most efficient. > > > > couldn't you also store the offset of the last read message, then resume > reading messages after that offset to ensure your consumer does not > consume the same message twice? > > Cheers, > -pete > > -- > Pete Wright > Lead Systems Architect > Rubicon Project > pwri...@rubiconproject.com > 310.309.9298 > -- http://khangaonkar.blogspot.com/
Re: Design questions related to kafka
# I looked the documents of kafka and I see that there is no way a > consume instance can >read specific messages from partition. > With Kafka you read messages from the beginning multiple times. Since you say later that you do not have many messages per topic, you can iterate over the message and read the ones that you need. Of course this might not the be the most efficient. > # I have an use case where I need to spawn a topic(single partition) > for each user, >so there would be 10k online users at a time, there would be very > less data per topic. >My questions is, what are limitations of having multiple topics(with > 1 partition), I think >this situation would cause heavy memory consumption and are they any > other limitations?. >Basically the problem boils down to what are the scalability > limitations of having >multiple topics(hardware/software)? > Partitioning the topic helps scale the writes. If you compare kafka to other message brokers, the others might choke when writes happen above a certain rate. By partitioning you are distributing the writes across multiple nodes/broker. You don'nt mention replication but that is relevant as well. It provides redundancy in that , if the primary broker goes down, your messages are still available. regards -- http://khangaonkar.blogspot.com/
Re: Some queries about java api for kafka producer
Clarification. My answer applies to the new producer API in 0.8.2. regards On Sun, Apr 12, 2015 at 4:00 PM, Manoj Khangaonkar wrote: > Hi, > > For (1) from the java docs "The producer is *thread safe* and should > generally be shared among all threads for best performance" > > (2) (1) implies no pool is necessary. > > regards > > On Sun, Apr 12, 2015 at 12:38 AM, dhiraj prajapati > wrote: > >> Hi, >> I want to send data to apache kafka using the java api of the kafka >> producer. The data will be in high volume, of the order of more than 5 >> thousand messages per second. >> Please help me with the following queries: >> >> 1. Should I create only one producer object for the entire app and use the >> same object to send all the messages and then close the producer in the >> end? Or should I create a producer object for every message to be sent and >> close the producer connection after sending each message? >> >> 2. Does the producer api inherently use a pool of Producer objects? If >> yes, >> what is the default size of the pool and is it configurable? >> >> Thanks in advance, >> Dhiraj Prajapati >> > > > > -- > http://khangaonkar.blogspot.com/ > -- http://khangaonkar.blogspot.com/
Re: Some queries about java api for kafka producer
Hi, For (1) from the java docs "The producer is *thread safe* and should generally be shared among all threads for best performance" (2) (1) implies no pool is necessary. regards On Sun, Apr 12, 2015 at 12:38 AM, dhiraj prajapati wrote: > Hi, > I want to send data to apache kafka using the java api of the kafka > producer. The data will be in high volume, of the order of more than 5 > thousand messages per second. > Please help me with the following queries: > > 1. Should I create only one producer object for the entire app and use the > same object to send all the messages and then close the producer in the > end? Or should I create a producer object for every message to be sent and > close the producer connection after sending each message? > > 2. Does the producer api inherently use a pool of Producer objects? If yes, > what is the default size of the pool and is it configurable? > > Thanks in advance, > Dhiraj Prajapati > -- http://khangaonkar.blogspot.com/
Re: Message routing, Kafka-to-REST and HTTP API tools/frameworks for Kafka?
Hi, For (1) and perhaps even for (2) where distribution/filtering on scale is required, I would look at using Apache Storm with kafka. For (3) , it seems you just need REST services wrapping kafka consumers/producers. I would start with usual suspects like jersey. regards On Tue, Mar 24, 2015 at 12:06 PM, Valentin wrote: > > Hi guys, > > we have three Kafka use cases for which we have written our own PoC > implementations, > but where I am wondering whether there might be any fitting open source > solution/tool/framework out there. > Maybe someone of you has some ideas/pointers? :) > > 1) Message routing/distribution/filter tool > We need to copy messages from a set of input topics to a set of output > topics > based on their message key values. Each message in an input topic will go > to 0 to N output topics, > each output topic will receive messages from 0 to N input topics. > So basically the tool acts as a message routing component in our system. > Example configuration: > ::,, > ::, > ::, > ... > It would also be interesting to define distribution/filter rules based on > regular expressions on the message key or message body. > > 2) Kafka-to-REST Push service > We need to consume messages from a set of topics, translate them into REST > web service calls > and forward the data to existing, non-Kafka-aware systems with REST APIs > that way. > > 3) HTTP REST API for consumers and producers > We need to expose the simple consumer and the producer functionalities via > REST web service calls, > with authentication and per-topic-authorization on REST API level and TLS > for transport encryption. > Offset tracking is done by the connected systems, not by the > broker/zookeeper/REST API. > We expect a high message volume in the future here, so performance would > be a key concern. > > Greetings > Valentin > -- http://khangaonkar.blogspot.com/
Re: Storm Kafka spout
Thanks. I was concerned that the other one was dated. regards On Wed, Apr 23, 2014 at 4:41 PM, Joe Stein wrote: > Folks have been using this spout > https://github.com/wurstmeister/storm-kafka-0.8-plus which has now been > merged into the storm incubating project > https://github.com/apache/incubator-storm/tree/master/external/storm-kafka > > /*** > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ****/ > > > On Wed, Apr 23, 2014 at 7:09 PM, Manoj Khangaonkar >wrote: > > > Hi, > > > > What is the open source Kafka Spout for storm that people are using ? > > > > What is the experience with > > https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka ? > > > > regards > > > > > > -- > > > -- http://khangaonkar.blogspot.com/
Storm Kafka spout
Hi, What is the open source Kafka Spout for storm that people are using ? What is the experience with https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka ? regards --
Re: Unable to get off the ground following the "quick start" section
Hi I was able to get the quickstart instructions working recently but I used the (1) binary download (2) I did not use the zookeeper packaged with Kafka. I installed zookeeper using a download from the zookeeper projects. ( I did get a lot of exceptions with the packaged zookeeper) regards On Thu, Apr 17, 2014 at 6:45 AM, Stephen Boesch wrote: > Thanks I will look into that resource. Using the tgz is working it turns > out I was symlinked back to the src dir when encountering the issue. > > > 2014-04-17 6:37 GMT-07:00 Bert Corderman : > > > I cant speak to the quick start , however I found the following very > > helpful when I was getting started. > > > > > > > http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/ > > > > > > On Thu, Apr 17, 2014 at 9:22 AM, Stephen Boesch > wrote: > > > > > I have tried to use kafka both building form source via gradle as well > as > > > un tgz-ing the tarball. Either way, none of the scripts work in the way > > > described in the Quick Start section. > > > > > > Following is an example : > > > > > > 6:15:40/kafka:91 $bin/zookeeper-server-start.sh > > config/zookeeper.properties > > > Exception in thread "main" java.lang.NoClassDefFoundError: > > > org/apache/zookeeper/server/quorum/QuorumPeerMain > > > Caused by: java.lang.ClassNotFoundException: > > > org.apache.zookeeper.server.quorum.QuorumPeerMain > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:202) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:190) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:306) > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:247) > > > > > > -- http://khangaonkar.blogspot.com/
Kafka API docs
Hi, The API description at http://kafka.apache.org/documentation.html#api is rather thin -- when you are used to the API docs of other apache projects like hadoop , cassandra , tomcat etc etc. Is there a comprehensive API description somewhere (like javadocs) ? Besides looking at the source code, what are users doing to get a handle on the full feature set of the Kafka API ? regards -- http://khangaonkar.blogspot.com/