Tools/recommendations to debug performance issues?

2015-09-14 Thread noah
We're using 0.8.2.1 processing maybe 1 million messages per hour. Each
message includes tracking information with a timestamp for when it was
produced, and a timestamp for when it was consumed, to give us roughly the
amount of time it spent in Kafka.  On average this number is in the seconds
and our upper percentiles are in the minutes.

What metrics and settings can we look at to figure out why we might be
spending so much time in Kafka?


Re: What can be reason for fetcher thread for slow response.

2015-09-14 Thread Helleren, Erik
Madhukar,
To me, the broker config looks good.  The issue I see is that there is
large number of synchronous producers, spamming the kafka brokers with a
lot of singular appends.   I think the suggested approach now is to use
the new producer API:
http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/Kafka
Producer.html  

Since the new producer only operates in asynchronous mode, it is fully
thread safe so you would only need a single producer per instance.  And it
batches the messages together and sends them to kafka very efficiently.
This would avoid nasty timeout exceptions, no hanging (unless the buffer
is full), and a lot less network overhead for kafka due to larger batches.
 If there is a worry about a message getting lost in the ether, you can
use the future returned by KafkaProducer.send(message) to do some sanity
checks.  Just try to avoid waiting in the connection thread if the
requirements allow.

Also, how many partitions is the topic setup to use, and what is the
replication factor?  If you only have one partition, or all the messages
happen to get routed to the same partition (all having the same key),
making better use of partitions to distributed the load across the kafka
cluster can result in faster response times and more throughput.  It also
explains why broker 2 is consistently the problem: because its the only
one receiving messages.

As for the numbers you listed, I am not sure what fetch time refers to.
Help from someone that knows more than I about request-logs would be
required.
-Erik

On 9/14/15, 12:25 AM, "Madhukar Bharti"  wrote:

>Hi Erik & Prabhjot
>
>We are using Kafka-0.8.2.1 and old producer API with below config:
>
>request.required.acks=1
>request.timeout.ms=2000
>producer.type=sync
>
>On Kafka broker we are having:
>
>num.network.threads=8
>num.io.threads=10
>num.replica.fetchers=4
>replica.fetch.max.bytes=2097154
>replica.fetch.wait.max.ms=500
>replica.socket.timeout.ms=6
>replica.socket.receive.buffer.bytes=65536
>replica.lag.time.max.ms=1
>replica.high.watermark.checkpoint.interval.ms=5000
>replica.lag.max.messages=100
>
>If you are asking about Singleton in terms of Producer then, we have
>created pool of producers that has equal no of Producers and connection
>that can be made in tomcat.
>
>
>Thanks and Regards,
>Madhukar
>
>On Fri, Sep 11, 2015 at 8:27 PM, Prabhjot Bharaj 
>wrote:
>
>> Hi,
>>
>> In addition to the parameters asked by Erik, it would be great if you
>>could
>> share your broker's server.properties as well
>>
>> Regards,
>> Prabhjot
>>
>> On Fri, Sep 11, 2015 at 8:10 PM, Helleren, Erik <
>> erik.helle...@cmegroup.com>
>> wrote:
>>
>> > Hi Madhukar,
>> > Some questions that can help understand whats going on: Which kafka
>> > version is used?  Which Producer API is being used
>> > (http://kafka.apache.org/documentation.html#producerapi)?  And what
>>are
>> > the configs for this producer?
>> >
>> > Also, because I know little about tomcat, is there a semantic for a
>> > singleton, or a server singleton?
>> > -Erik
>> >
>> > On 9/11/15, 8:48 AM, "Madhukar Bharti" 
>>wrote:
>> >
>> > >Hi,
>> > >
>> > >
>> > >We are having 3 brokers in a cluster. Producer request is getting
>>failed
>> > >for broker 2. We are frequently getting below exception:
>> > >
>> > >15/09/09 22:09:06 WARN async.DefaultEventHandler: Failed to send
>> > >producer request with* correlation id 1455 to broker 2* with data for
>> > >partitions [UserEvents,0]
>> > >> java.net.SocketTimeoutException
>> > >>  at
>> > 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>> > >>  at
>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>> > >>  at
>> >
>> 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:3
85
>> > >>)
>> > >>  at kafka.utils.Utils$.read(Utils.scala:375)
>> > >>  at
>> >
>> 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceiv
e.
>> > >>scala:54)
>> > >>  at
>> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> > >>  at
>> >
>> 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBuffer
Re
>> > >>ceive.scala:29)
>> > >>  at
>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> > >>
>> > >>
>> > >After looking into request-logs in all machines, found that there is
>> some
>> > >slowness in broker 2. I am listing top 20 request processing time
>>from
>> all
>> > >the brokers.
>> > >
>> > >Broker 1
>> > >
>> > >  Broker 2
>> > >
>> > >   Broker 3
>> > >
>> > >Producer&
>> > >
>> > >Fetcher
>> > >
>> > >Producer
>> > >
>> > >Producer + Fetcher
>> > >
>> > >Producer
>> > >
>> > >Producer +  Fetcher
>> > >
>> > >Producer
>> > >
>> > >493
>> > >
>> > >494
>> > >
>> > >495
>> > >
>> > >496
>> > >
>> > >497
>> > >
>> > >498
>> > >
>> > >499
>> > >
>> > >500
>> > >
>> > >501

Re: Tools/recommendations to debug performance issues?

2015-09-14 Thread Rahul Jain
Have you checked the consumer lag? You can use the offset checker tool to
see if there is a lag.
On 14 Sep 2015 18:36, "noah"  wrote:

> We're using 0.8.2.1 processing maybe 1 million messages per hour. Each
> message includes tracking information with a timestamp for when it was
> produced, and a timestamp for when it was consumed, to give us roughly the
> amount of time it spent in Kafka.  On average this number is in the seconds
> and our upper percentiles are in the minutes.
>
> What metrics and settings can we look at to figure out why we might be
> spending so much time in Kafka?
>


Re: Auto preferred leader elections cause data loss?

2015-09-14 Thread Zhao Weinan
Hi group,

I think I've hit the KAFKA-1561 and KAFKA-1211, that is follower's HW is
always <= leader's HW, then when leader transits, data loss can happen...

So what we could do is try to do every thing to avoid leader transition,
right?

2015-09-14 11:59 GMT+08:00 Zhao Weinan :

> Hi group,
>
> Sorry I'm confused with *min.insync.replicas: When a producer sets
> request.required.acks to -1, min.insync.replicas specifies the minimum
> number of replicas that must acknowledge a write for the write to be
> considered successful. *When talking about number of replicas, dose it
> include the leader?If it dose, then setting min.insync.replicas=2;
> unclean.leader.election.enable=false; replicationFactor = 3 and acks=all
> still could cause data loss, since the one replica that not catch up leader
> can still in ISR then can be cleaned elect as new leader when current
> leader gone, then the lag between this new leader and old-gone-leader is
> lost, right?
>
> Thanks!
>
> 2015-09-12 18:15 GMT+08:00 Zhao Weinan :
>
>> Hi group,
>>
>> I've been through some data loss on kafka cluster, one case maybe caused
>> by the auto preferred leader election.
>>
>> Here is the situation: 3 brokers = {0,1,2}, 1 partition with 3 replicas
>> on 0/1/2, all in sync while 0 is leader and 1 is controller, current offset
>> is 100.
>>
>> And here is my hypothesis:
>> a. Leader 0 is temporary gone due to instablity connection with ZK
>> b. Controller 1 found that 0 has gone then do a election which let 1(in
>> ISR) to be leader
>> c. A producer send 1 message to new leader 1, then offset is 101
>> d. Old leader 0 is back to cluster(*STILL IN ISR* because the lag not
>> exceed the *replica.lag.time.max.ms *
>> and *replica.lag.max.messages*)
>> e. Coincidentally, controller 1 start to
>> checkAndTriggerPartitionRebalance then decide 0 is more preferred, so let 0
>> to be back to leader
>> f. Broker 1 become to follower then found HW to be 100, so truncate to
>> 100 that lead to lost newest message.
>>
>> With this situation, using most reliable settings(broker side:
>> unclean.leader.election.enable=false, min.insync.replicas=2; producer side
>> acks=all) is useless. Am I correct or just paranoia?
>>
>> Below is some real logs from production.
>> I*n controller.log:*
>>
>>> *// broker 6 tempraroy gone*
>>>
>> [2015-09-09 15:24:42,206] INFO [BrokerChangeListener on Controller 3]:
>>> Newly added brokers: , deleted brokers: 6, all live brokers: 0,5,1,2,7,3,4
>>> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>>> [2015-09-09 15:24:42,461] INFO [Controller 3]: Broker failure callback
>>> for 6 (kafka.controller.KafkaController)
>>> [2015-09-09 15:24:42,464] INFO [Controller 3]: Removed ArrayBuffer()
>>> from list of shutting down brokers. (kafka.controller.KafkaController)
>>> [2015-09-09 15:24:42,466] INFO [Partition state machine on Controller
>>> 3]: Invoking state change to OfflinePartition for partitions
>>> [SOME_TOPIC_NAME,1] (kafka.controller.PartitionStateMachine)
>>>
>>> *// elect 3 which in ISR to be leader*
>>> [2015-09-09 15:24:43,182] DEBUG [OfflinePartitionLeaderSelector]: Some
>>> broker in ISR is alive for [SOME_TOPIC_NAME,1]. Select 3 from ISR 3,4 to be
>>> the leader. (kafka.controller.OfflinePartitionLeaderSelector)
>>> [2015-09-09 15:24:43,182] INFO [OfflinePartitionLeaderSelector]:
>>> Selected new leader and ISR {"leader":3,"leader_epoch":45,"isr":[3,4]} for
>>> offline partition [SOME_TOPIC_NAME,1]
>>> (kafka.controller.OfflinePartitionLeaderSelector)
>>> [2015-09-09 15:24:43,928] DEBUG [Controller 3]: Removing replica 6 from
>>> ISR 3,4 for partition [SOME_TOPIC_NAME,1].
>>> (kafka.controller.KafkaController)
>>> [2015-09-09 15:24:43,929] WARN [Controller 3]: Cannot remove replica 6
>>> from ISR of partition [SOME_TOPIC_NAME,1] since it is not in the ISR.
>>> Leader = 3 ; ISR = List(3, 4) (kafka.controller.KafkaController)
>>>
>>> *// broker 6 back*
>>>
>> [2015-09-09 15:24:44,575] INFO [BrokerChangeListener on Controller 3]:
>>> Newly added brokers: 6, deleted brokers: , all live brokers:
>>> 0,5,1,6,2,7,3,4 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
>>>
>>> *// broker 6 is elected as leader by auto preferred leader election*
>>>
>> [2015-09-09 15:24:50,939] INFO [Controller 3]: Starting preferred replica
>>> leader election for partitions [SOME_TOPIC_NAME,1]
>>> (kafka.controller.KafkaController)
>>> [2015-09-09 15:24:50,945] INFO
>>> [PreferredReplicaPartitionLeaderSelector]: Current leader 3 for partition
>>> [SOME_TOPIC_NAME,1] is not the preferred replica. Trigerring preferred
>>> replica leader election
>>> (kafka.controller.PreferredReplicaPartitionLeaderSelector)
>>>
>>>
>> *And in server.log:*
>>
>>> *// broker 3 truncating, lossing data*
>>
>> 2015-09-09 15:24:50,964] INFO Truncating log SOME_TOPIC_NAME-1 to offset
>>> 420549. (kafka.log.Log)
>>>
>>
>


Re: Auto preferred leader elections cause data loss?

2015-09-14 Thread Gwen Shapira
acks = all should prevent this scenario:

If broker 0 is still in ISR, the produce request for 101 will not be
"acked" (because 0 is in ISR and not available for acking), and the
producer will retry it until all ISR acks.

If broker 0 dropped off ISR, it will not be able to rejoin until it has all
the latest messages, including 101.

So if you use the safe settings you should be safe in this scenario.

Gwen


On Sat, Sep 12, 2015 at 3:15 AM, Zhao Weinan  wrote:

> Hi group,
>
> I've been through some data loss on kafka cluster, one case maybe caused by
> the auto preferred leader election.
>
> Here is the situation: 3 brokers = {0,1,2}, 1 partition with 3 replicas on
> 0/1/2, all in sync while 0 is leader and 1 is controller, current offset is
> 100.
>
> And here is my hypothesis:
> a. Leader 0 is temporary gone due to instablity connection with ZK
> b. Controller 1 found that 0 has gone then do a election which let 1(in
> ISR) to be leader
> c. A producer send 1 message to new leader 1, then offset is 101
> d. Old leader 0 is back to cluster(*STILL IN ISR* because the lag not
> exceed the *replica.lag.time.max.ms * and
> *replica.lag.max.messages*)
> e. Coincidentally, controller 1 start to checkAndTriggerPartitionRebalance
> then decide 0 is more preferred, so let 0 to be back to leader
> f. Broker 1 become to follower then found HW to be 100, so truncate to 100
> that lead to lost newest message.
>
> With this situation, using most reliable settings(broker side:
> unclean.leader.election.enable=false, min.insync.replicas=2; producer side
> acks=all) is useless. Am I correct or just paranoia?
>
> Below is some real logs from production.
> I*n controller.log:*
>
> > *// broker 6 tempraroy gone*
> >
> [2015-09-09 15:24:42,206] INFO [BrokerChangeListener on Controller 3]:
> > Newly added brokers: , deleted brokers: 6, all live brokers:
> 0,5,1,2,7,3,4
> > (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> > [2015-09-09 15:24:42,461] INFO [Controller 3]: Broker failure callback
> for
> > 6 (kafka.controller.KafkaController)
> > [2015-09-09 15:24:42,464] INFO [Controller 3]: Removed ArrayBuffer() from
> > list of shutting down brokers. (kafka.controller.KafkaController)
> > [2015-09-09 15:24:42,466] INFO [Partition state machine on Controller 3]:
> > Invoking state change to OfflinePartition for partitions
> > [SOME_TOPIC_NAME,1] (kafka.controller.PartitionStateMachine)
> >
> > *// elect 3 which in ISR to be leader*
> > [2015-09-09 15:24:43,182] DEBUG [OfflinePartitionLeaderSelector]: Some
> > broker in ISR is alive for [SOME_TOPIC_NAME,1]. Select 3 from ISR 3,4 to
> be
> > the leader. (kafka.controller.OfflinePartitionLeaderSelector)
> > [2015-09-09 15:24:43,182] INFO [OfflinePartitionLeaderSelector]: Selected
> > new leader and ISR {"leader":3,"leader_epoch":45,"isr":[3,4]} for offline
> > partition [SOME_TOPIC_NAME,1]
> > (kafka.controller.OfflinePartitionLeaderSelector)
> > [2015-09-09 15:24:43,928] DEBUG [Controller 3]: Removing replica 6 from
> > ISR 3,4 for partition [SOME_TOPIC_NAME,1].
> > (kafka.controller.KafkaController)
> > [2015-09-09 15:24:43,929] WARN [Controller 3]: Cannot remove replica 6
> > from ISR of partition [SOME_TOPIC_NAME,1] since it is not in the ISR.
> > Leader = 3 ; ISR = List(3, 4) (kafka.controller.KafkaController)
> >
> > *// broker 6 back*
> >
> [2015-09-09 15:24:44,575] INFO [BrokerChangeListener on Controller 3]:
> > Newly added brokers: 6, deleted brokers: , all live brokers:
> > 0,5,1,6,2,7,3,4
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> >
> > *// broker 6 is elected as leader by auto preferred leader election*
> >
> [2015-09-09 15:24:50,939] INFO [Controller 3]: Starting preferred replica
> > leader election for partitions [SOME_TOPIC_NAME,1]
> > (kafka.controller.KafkaController)
> > [2015-09-09 15:24:50,945] INFO [PreferredReplicaPartitionLeaderSelector]:
> > Current leader 3 for partition [SOME_TOPIC_NAME,1] is not the preferred
> > replica. Trigerring preferred replica leader election
> > (kafka.controller.PreferredReplicaPartitionLeaderSelector)
> >
> >
> *And in server.log:*
>
> > *// broker 3 truncating, lossing data*
>
> 2015-09-09 15:24:50,964] INFO Truncating log SOME_TOPIC_NAME-1 to offset
> > 420549. (kafka.log.Log)
> >
>


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Gwen Shapira
We decided to rename 0.8.3 to 0.9.0 since it contains few large changes
(Security, new consumer, quotas).



On Sun, Sep 13, 2015 at 11:56 PM, Jason Rosenberg  wrote:

> Hi Jun,
>
> Can you clarify, will there not be a 0.8.3.0 (and instead we move straight
> to 0.9.0.0)?
>
> Also, can you outline the man new features/updates for 0.9.0.0?
>
> Thanks,
>
> Jason
>
> On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao  wrote:
>
> > The following is a candidate list of jiras that we want to complete in
> the
> > upcoming release (0.9.0.0). Our goal is to finish at least all the
> blockers
> > and as many as the non-blockers possible in that list.
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
> >
> > Anything should be added/removed from this list?
> >
> > We are shooting to cut an 0.9.0.0 release branch in early October.
> >
> > Thanks,
> >
> > Jun
> >
>


Re: Tools/recommendations to debug performance issues?

2015-09-14 Thread Gwen Shapira
Kafka also collects very useful metrics on request times and their
breakdown.
They are under kafka.network.



On Mon, Sep 14, 2015 at 6:59 AM, Rahul Jain  wrote:

> Have you checked the consumer lag? You can use the offset checker tool to
> see if there is a lag.
> On 14 Sep 2015 18:36, "noah"  wrote:
>
> > We're using 0.8.2.1 processing maybe 1 million messages per hour. Each
> > message includes tracking information with a timestamp for when it was
> > produced, and a timestamp for when it was consumed, to give us roughly
> the
> > amount of time it spent in Kafka.  On average this number is in the
> seconds
> > and our upper percentiles are in the minutes.
> >
> > What metrics and settings can we look at to figure out why we might be
> > spending so much time in Kafka?
> >
>


Re: Broker restart, new producer and snappy in 0.8.2.1

2015-09-14 Thread Vidhya Arvind
Ok Thanks Joe, Will try 0.8.2.2 producer

Vidhya

On Sun, Sep 13, 2015 at 7:12 AM, Joe Stein  wrote:

> Hi, the 0.8.2.2 release (which vote just passed and should be announced
> soon) has a patch that may be related
> https://issues.apache.org/jira/browse/KAFKA-2308 not sure.
>
> Here are the 0.8.2.2 artifacts
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/ I don't see
> them yet in Maven central so you will need to download into a local mvn and
> use 0.8.2.2 producer. I don't know it if will fix your problem but if you
> can reproduce it and see and if not post to a JIRA that would be great.
>
> Thanks!
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - - - -
>  [image: Logo-Black.jpg]
>   http://www.elodina.net
> http://www.stealth.ly
> - - - - - - - - - - - - - - - - - - -
>
> On Sat, Sep 12, 2015 at 3:32 PM, Vidhya Arvind 
> wrote:
>
> > There has been multiple instances of this incident. When I restart the
> > broker for config changes it's bringing down the consumers and mirror
> maker
> > and I am seeing CRC corruption in mirror maker and following error in
> > broker. I have reset the offset in zookeeper for certain
> topic/partitions.
> > But I still see this issue popping up for other topics/partitions. Please
> > let me know how I can resolve this. This has happened twice in production
> > system
> >
> > Please let me if there is anything I can try to fix the issue before the
> > next broker restart
> >
> > Vidhya
> >
> >
> > [2015-09-12 19:04:03,409] ERROR [KafkaApi-30001] Error processing
> > ProducerRequest with correlation id 3480 from client producer-1 on
> > partition [events_prod_oncue.ws.client.ui_UIEvent,29]
> > (kafka.server.KafkaApis)
> > kafka.common.KafkaException: Error in validating messages while
> > appending to log 'events_prod_oncue.ws.client.ui_UIEvent-29'
> > at kafka.log.Log.liftedTree1$1(Log.scala:277)
> > at kafka.log.Log.append(Log.scala:274)
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> > at
> >
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> > at kafka.utils.Utils$.inLock(Utils.scala:535)
> > at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> > at
> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> > at
> >
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> > at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
> > at
> >
> kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> > at
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> > at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> > at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
> > at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
> > at
> >
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:362)
> > at
> > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
> > at
> > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
> > at java.io.InputStream.read(InputStream.java:101)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(ByteBufferMessageSet.scala:67)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> > at
> >
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(ByteBufferMessageSet.scala:67)
> > at
> > scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > at
> >
> 

Re: Unclean leader election docs outdated

2015-09-14 Thread Guozhang Wang
Yes you are right. Could you file a JIRA to edit the documents?

Guozhang

On Fri, Sep 11, 2015 at 4:41 PM, Stevo Slavić  wrote:

> That sentence is in both
> https://svn.apache.org/repos/asf/kafka/site/083/design.html and
> https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end
> of
> "Unclean leader election: What if they all die?" section. Next one,
> "Availability and Durability Guarantees", mentions ability to disable
> unclean leader election, so likely just this one reference needs to be
> updated.
>
> On Sat, Sep 12, 2015 at 1:05 AM, Guozhang Wang  wrote:
>
> > Hi Stevo,
> >
> > Could you point me to the link of the docs?
> >
> > Guozhang
> >
> > On Fri, Sep 11, 2015 at 5:47 AM, Stevo Slavić  wrote:
> >
> > > Hello Apache Kafka community,
> > >
> > > Current unclean leader election docs state:
> > > "In the future, we would like to make this configurable to better
> support
> > > use cases where downtime is preferable to inconsistency. "
> > >
> > > If I'm not mistaken, since 0.8.2, unclean leader election strategy
> > (whether
> > > to allow it or not) is already configurable via
> > > unclean.leader.election.enable broker config property.
> > >
> > > Kind regards,
> > > Stevo Slavic.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: producer api

2015-09-14 Thread Yuheng Du
Thank you Erik. But in my setup, there is only one node whose public ip
provided in my broker cluster, so I can only use one bootstrap broker as
for now.


On Mon, Sep 14, 2015 at 3:50 PM, Helleren, Erik 
wrote:

> You only need one of the brokers to connect for publishing.  Kafka will
> tell the client about all the other brokers.  But best practices state
> including all of them is best.
> -Erik
>
> On 9/14/15, 2:46 PM, "Yuheng Du"  wrote:
>
> >I am writing a kafka producer application in java. I want the producer to
> >publish data to a cluster of 6 brokers. Is there a way to specify only the
> >load balancing node but not all the brokers list?
> >
> >For example, like in the benchmarking kafka commandssdg:
> >
> >bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> >test 5000 100 -1 acks=-1 bootstrap.servers=
> >esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> batch.size=64000
> >
> >it only specifies the bootstrap server node but not all the broker list
> >like:
> >
> >Properties props = new Properties();
> >
> >props.put("metadata.broker.list", "broker1:9092,broker2:9092");
> >
> >
> >
> >Thanks for replying.
>
>


Thread safety when committing from multiple threads

2015-09-14 Thread Ashish Shenoy
Hi,

I have a multi-threaded Kafka consumer. I have auto commit disabled and
from each thread, I periodically call commitOffsets() to commit the offsets
to Kafka.

Is this thread safe ? What is the effect of calling commitOffsets()
concurrently from multiple threads ? Should the consumer writer ensure that
the call to commitOffsets() is synchronized across multiple threads ?

Thanks,
Ashish


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Edward Ribeiro
Is KAFKA-1811  worth
considering for 0.9.0?

Thanks!

Eddie

On Mon, Sep 14, 2015 at 4:34 PM, Jiangjie Qin 
wrote:

> HI Jun,
>
> Can we also include KAFKA-2448 in 0.9 as well? We see this issue a few
> times before and that cause replica fetcher threads not startup.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Sep 12, 2015 at 9:40 AM, Jun Rao  wrote:
>
> > The following is a candidate list of jiras that we want to complete in
> the
> > upcoming release (0.9.0.0). Our goal is to finish at least all the
> blockers
> > and as many as the non-blockers possible in that list.
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
> >
> > Anything should be added/removed from this list?
> >
> > We are shooting to cut an 0.9.0.0 release branch in early October.
> >
> > Thanks,
> >
> > Jun
> >
>


Re: port already in use error when trying to add topic

2015-09-14 Thread allen chan
After completely disabling JMX settings, i was able to create topics. Seems
like there is an issue with using JMX with the product. Should i create bug?

On Sun, Sep 13, 2015 at 9:07 PM, allen chan 
wrote:

> Changing the port to 9998 did not help. Still the same error occurred
>
> On Sat, Sep 12, 2015 at 12:27 AM, Foo Lim  wrote:
>
>> Try throwing
>>
>> JMX_PORT=9998
>>
>> In front of the command. Anything other than 9994
>>
>> Foo
>>
>> On Friday, September 11, 2015, allen chan 
>> wrote:
>>
>> > Hi all,
>> >
>> > First time testing kafka with brand new cluster.
>> >
>> > Running into an issue that i do not understand.
>> >
>> > Server started up fine but I get error when trying to create a topic.
>> >
>> > *[achan@server1 ~]$ ps -ef | grep -i kafka*
>> > *root  6507 1  0 15:42 ?00:00:00 sudo
>> > /opt/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh
>> > /opt/kafka_2.10-0.8.2.1/config/server.properties*
>> > *root  6508  6507  0 15:42 ?00:00:36 java -Xmx1G -Xms1G
>> -server
>> > -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
>> > -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
>> > -Djava.awt.headless=true
>> > -Xloggc:/opt/kafka_2.10-0.8.2.1/bin/../logs/kafkaServer-gc.log
>> -verbose:gc
>> > -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
>> > -Dcom.sun.management.jmxremote -Djava.rmi.server.hostname=server1
>> > -Dcom.sun.management.jmxremote.authenticate=false
>> > -Dcom.sun.management.jmxremote.ssl=false
>> > -Dcom.sun.management.jmxremote.port=9994
>> > -Dkafka.logs.dir=/opt/kafka_2.10-0.8.2.1/bin/../logs
>> >
>> >
>> -Dlog4j.configuration=file:/opt/kafka_2.10-0.8.2.1/bin/../config/log4j.properties
>> > -cp
>> >
>> >
>> :/opt/kafka_2.10-0.8.2.1/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/kafka_2.10-0.8.2.1/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka_2.10-0.8.2.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka_2.10-0.8.2.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka_2.10-0.8.2.1/bin/../clients/build/libs/kafka-clients*.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/jopt-simple-3.2.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-javadoc.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-scaladoc.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-sources.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-test.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka-clients-0.8.2.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/log4j-1.2.16.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/lz4-1.2.0.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/scala-library-2.10.4.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/slf4j-api-1.7.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/snappy-java-1.1.1.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/zkclient-0.3.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/zookeeper-3.4.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar
>> > kafka.Kafka /opt/kafka_2.10-0.8.2.1/config/server.properties*
>> >
>> >
>> > *[achan@server1 ~]$ sudo /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh
>> > --create –zookeeper server1:2181 –partition 3 –replica 0 –topic
>> test.logs*
>> > *[sudo] password for achan:*
>> > *Error: Exception thrown by the agent : java.rmi.server.ExportException:
>> > Port already in use: 9994; nested exception is:*
>> > * java.net.BindException: Address already in use*
>> >
>> > I have pretty much the manila kakfa-run-class.sh except i added the
>> > following
>> > *export JMX_PORT=9994* (near the top of the file)
>> > and
>> > *Djava.rmi.server.hostname=sever1* (to the KAFKA_JMX_OPTS)
>> >
>> > JMX monitoring is working perfectly on 9994.
>> >
>> > Has anyone else run into this issue?
>> >
>> >
>> > Thanks
>> > --
>> > Allen Michael Chan
>> >
>>
>
>
>
> --
> Allen Michael Chan
>



-- 
Allen Michael Chan


Re: port already in use error when trying to add topic

2015-09-14 Thread Lance Laursen
This is not a bug. The java process spawned by kafka-topics.sh is trying to
bind to 9998 upon start. The java process spawned by kafka-server-start.sh
already owns that port. It's doing this because both of these scripts use
kafka-run-class.sh and that is where you defined your 'export JMX_PORT'.

Put your export statement into kafka-server-start.sh instead and run
kafka-topics.sh using a separate terminal or user account. Also, google
search "linux environment variables." You could also just run
kafka-topics.sh from a separate host, such as your workstation, so long as
it can see zookeeper:2181.

On Mon, Sep 14, 2015 at 3:52 PM, allen chan 
wrote:

> After completely disabling JMX settings, i was able to create topics. Seems
> like there is an issue with using JMX with the product. Should i create
> bug?
>
> On Sun, Sep 13, 2015 at 9:07 PM, allen chan 
> wrote:
>
> > Changing the port to 9998 did not help. Still the same error occurred
> >
> > On Sat, Sep 12, 2015 at 12:27 AM, Foo Lim  wrote:
> >
> >> Try throwing
> >>
> >> JMX_PORT=9998
> >>
> >> In front of the command. Anything other than 9994
> >>
> >> Foo
> >>
> >> On Friday, September 11, 2015, allen chan  >
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > First time testing kafka with brand new cluster.
> >> >
> >> > Running into an issue that i do not understand.
> >> >
> >> > Server started up fine but I get error when trying to create a topic.
> >> >
> >> > *[achan@server1 ~]$ ps -ef | grep -i kafka*
> >> > *root  6507 1  0 15:42 ?00:00:00 sudo
> >> > /opt/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh
> >> > /opt/kafka_2.10-0.8.2.1/config/server.properties*
> >> > *root  6508  6507  0 15:42 ?00:00:36 java -Xmx1G -Xms1G
> >> -server
> >> > -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> >> > -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
> >> > -Djava.awt.headless=true
> >> > -Xloggc:/opt/kafka_2.10-0.8.2.1/bin/../logs/kafkaServer-gc.log
> >> -verbose:gc
> >> > -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
> >> > -Dcom.sun.management.jmxremote -Djava.rmi.server.hostname=server1
> >> > -Dcom.sun.management.jmxremote.authenticate=false
> >> > -Dcom.sun.management.jmxremote.ssl=false
> >> > -Dcom.sun.management.jmxremote.port=9994
> >> > -Dkafka.logs.dir=/opt/kafka_2.10-0.8.2.1/bin/../logs
> >> >
> >> >
> >>
> -Dlog4j.configuration=file:/opt/kafka_2.10-0.8.2.1/bin/../config/log4j.properties
> >> > -cp
> >> >
> >> >
> >>
> :/opt/kafka_2.10-0.8.2.1/bin/../core/build/dependant-libs-2.10.4*/*.jar:/opt/kafka_2.10-0.8.2.1/bin/../examples/build/libs//kafka-examples*.jar:/opt/kafka_2.10-0.8.2.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/opt/kafka_2.10-0.8.2.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/opt/kafka_2.10-0.8.2.1/bin/../clients/build/libs/kafka-clients*.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/jopt-simple-3.2.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-javadoc.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-scaladoc.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-sources.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka_2.10-0.8.2.1-test.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/kafka-clients-0.8.2.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/log4j-1.2.16.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/lz4-1.2.0.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/scala-library-2.10.4.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/slf4j-api-1.7.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/slf4j-log4j12-1.6.1.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/snappy-java-1.1.1.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/zkclient-0.3.jar:/opt/kafka_2.10-0.8.2.1/bin/../libs/zookeeper-3.4.6.jar:/opt/kafka_2.10-0.8.2.1/bin/../core/build/libs/kafka_2.10*.jar
> >> > kafka.Kafka /opt/kafka_2.10-0.8.2.1/config/server.properties*
> >> >
> >> >
> >> > *[achan@server1 ~]$ sudo /opt/kafka_2.10-0.8.2.1/bin/kafka-topics.sh
> >> > --create –zookeeper server1:2181 –partition 3 –replica 0 –topic
> >> test.logs*
> >> > *[sudo] password for achan:*
> >> > *Error: Exception thrown by the agent :
> java.rmi.server.ExportException:
> >> > Port already in use: 9994; nested exception is:*
> >> > * java.net.BindException: Address already in use*
> >> >
> >> > I have pretty much the manila kakfa-run-class.sh except i added the
> >> > following
> >> > *export JMX_PORT=9994* (near the top of the file)
> >> > and
> >> > *Djava.rmi.server.hostname=sever1* (to the KAFKA_JMX_OPTS)
> >> >
> >> > JMX monitoring is working perfectly on 9994.
> >> >
> >> > Has anyone else run into this issue?
> >> >
> >> >
> >> > Thanks
> >> > --
> >> > Allen Michael Chan
> >> >
> >>
> >
> >
> >
> > --
> > Allen Michael Chan
> >
>
>
>
> --
> Allen Michael Chan
>


Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Jason Rosenberg
Hi Jun,

Can you clarify, will there not be a 0.8.3.0 (and instead we move straight
to 0.9.0.0)?

Also, can you outline the man new features/updates for 0.9.0.0?

Thanks,

Jason

On Sat, Sep 12, 2015 at 12:40 PM, Jun Rao  wrote:

> The following is a candidate list of jiras that we want to complete in the
> upcoming release (0.9.0.0). Our goal is to finish at least all the blockers
> and as many as the non-blockers possible in that list.
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>
> Anything should be added/removed from this list?
>
> We are shooting to cut an 0.9.0.0 release branch in early October.
>
> Thanks,
>
> Jun
>


Unreasonably high CPU from Kafka (0.8.2.1)

2015-09-14 Thread Jaikiran Pai
We have been using Kafka for a while now in one of dev projects. 
Currently we have just 1 broker and 1 zookeeper instance. Almost every 
day, Kafka "stalls" and we end up cleaning up the data/log folder of 
Kafka and zookeeper and bring it up afresh. We haven't been able to 
narrow down the issue yet.


However, keeping aside that part for a while, we have been noticing that 
even when the system/application is completely idle, the Kafka process 
seems to take up unreasonably high CPU (10-15% constantly shown in top 
command). We have taken multiple thread dumps and each of them have this:


"kafka-socket-acceptor" #24 prio=5 os_prio=0 tid=0x7f62685d9000 
nid=0x2d47 runnable [0x7f6231464000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77a458> (a sun.nio.ch.Util$2)
- locked <0xca77a440> (a java.util.Collections$UnmodifiableSet)
- locked <0xca774550> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Acceptor.run(SocketServer.scala:215)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-2" #23 prio=5 os_prio=0 
tid=0x7f62685d6800 nid=0x2d46 runnable [0x7f6231565000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77d050> (a sun.nio.ch.Util$2)
- locked <0xca77d038> (a java.util.Collections$UnmodifiableSet)
- locked <0xca7745e0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-1" #22 prio=5 os_prio=0 
tid=0x7f62685c7800 nid=0x2d45 runnable [0x7f6231666000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77e590> (a sun.nio.ch.Util$2)
- locked <0xca77e578> (a java.util.Collections$UnmodifiableSet)
- locked <0xca7746b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-0" #21 prio=5 os_prio=0 
tid=0x7f62685b9000 nid=0x2d44 runnable [0x7f6231767000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77fbd0> (a sun.nio.ch.Util$2)
- locked <0xca77fbb8> (a java.util.Collections$UnmodifiableSet)
- locked <0xca774790> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)




Looking at the code of 0.8.2.1, this piece of code looks like 
https://github.com/apache/kafka/blob/0.8.2.1/core/src/main/scala/kafka/network/SocketServer.scala#L314:


while(isRunning) {
...
val ready = selector.select(300)
...
if(ready > 0) {
...
}
...
}

This looks like a (always) "busy" while loop when selector.select 
returns 0. Could a sleep for a few milli. seconds help in this case? 
Similar code is present in the Acceptor in that same file, which does 
this exact thing. Would adding some small sleep in there help with 
reducing the CPU usage when things are idle?


-Jaikiran




Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Jiangjie Qin
HI Jun,

Can we also include KAFKA-2448 in 0.9 as well? We see this issue a few
times before and that cause replica fetcher threads not startup.

Thanks,

Jiangjie (Becket) Qin

On Sat, Sep 12, 2015 at 9:40 AM, Jun Rao  wrote:

> The following is a candidate list of jiras that we want to complete in the
> upcoming release (0.9.0.0). Our goal is to finish at least all the blockers
> and as many as the non-blockers possible in that list.
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>
> Anything should be added/removed from this list?
>
> We are shooting to cut an 0.9.0.0 release branch in early October.
>
> Thanks,
>
> Jun
>


producer api

2015-09-14 Thread Yuheng Du
I am writing a kafka producer application in java. I want the producer to
publish data to a cluster of 6 brokers. Is there a way to specify only the
load balancing node but not all the brokers list?

For example, like in the benchmarking kafka commandssdg:

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test 5000 100 -1 acks=-1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000

it only specifies the bootstrap server node but not all the broker list
like:

Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092");



Thanks for replying.


Re: producer api

2015-09-14 Thread Helleren, Erik
You only need one of the brokers to connect for publishing.  Kafka will
tell the client about all the other brokers.  But best practices state
including all of them is best.
-Erik

On 9/14/15, 2:46 PM, "Yuheng Du"  wrote:

>I am writing a kafka producer application in java. I want the producer to
>publish data to a cluster of 6 brokers. Is there a way to specify only the
>load balancing node but not all the brokers list?
>
>For example, like in the benchmarking kafka commandssdg:
>
>bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
>test 5000 100 -1 acks=-1 bootstrap.servers=
>esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000
>
>it only specifies the bootstrap server node but not all the broker list
>like:
>
>Properties props = new Properties();
>
>props.put("metadata.broker.list", "broker1:9092,broker2:9092");
>
>
>
>Thanks for replying.



How to model topics and partitions for Kafka when used to store all business events?

2015-09-14 Thread Johan Haleby
Hi,

We're considering using Kafka as a way to store all our business events
(effectively) forever. The purpose is to be able to spin up new
"microservices" that we haven't yet thought of that will be able to
leverage on all previous events to build up their projections/state.
Another use case might be an existing service where we'd like to "replay"
all events that is of interest to this service to recreate its state.

Note that we're not planning to use Kafka as an "event store" in the sense
that events will be projected/loaded into an aggregate on every request.

Also (as far as I can tell) we don't know how consumers will consume the
events. A new microservice might need all sorts of different events in
order to create its internal projection/state.

   1. Would Kafka be suitable for this?
   2. If so, what's a good way to model this (topics/partitions)? For
   example would it be alright to just use a single topic for all events?
   3. We're currently using RabbitMQ for messaging (business events are
   sent to RabbitMQ). It would be great if we could migrate away from RabbitMQ
   in the future and move entirely to Kafka. I assume that this could change
   the way topics and partitions are modelled since now we have a better
   understanding of how consumers will consume the events. Would this be
   compatible with the other use case (infinite retention and replay)?

Regards,
/Johan


problems about setting up environment

2015-09-14 Thread 한민석
Hi

I`m try to learning kafka. so I read wiki and follow that
(https://cwiki.apache.org/confluence/display/KAFKA/Developer+Setup)

and I finally import projects on eclipse workspace. But here are some
problems about compile errors.

1. In 'examples' project :

   1) kafka.examples.Consumer.java

 KafkaStream* stream* = consumerMap.get(topic).get(0);
for (MessageAndMetadata messageAndMetadata : *stream*)

Type mismatch: cannot convert from element type Object to
MessageAndMetadata

   2) build path error

'core' project has been  already referenced by 'examples' project. but here
are some error message below.

The project was not built since its build path is incomplete. Cannot find
the class file for kafka.javaapi.consumer.ConsumerConnector.


2. In 'core' project :

Project 'core' is missing required library:
'C:\gitRepo\kafka\clients\build\classes\test'
Project 'core' is missing required library:
'C:\gitRepo\kafka\clients\build\resources\test'

Thanks