Re: logging for Kafka new producer

2015-08-10 Thread Manikumar Reddy
New producer uses SLF4J logging. We can configure any logging framework
like log4j, java.util.logging and logback etc.

On Tue, Aug 11, 2015 at 11:38 AM, Tao Feng  wrote:

> Hi,
>
> I am wondering what Kafka new producer uses for logging. Is it  log4j?
>
> Thanks,
> -Tao
>


logging in Kafka new producer

2015-08-10 Thread Tao Feng
Hi,

I am wondering what Kafka new producer uses for logging? Is it log4j?

Thanks,
-Tao


logging for Kafka new producer

2015-08-10 Thread Tao Feng
Hi,

I am wondering what Kafka new producer uses for logging. Is it  log4j?

Thanks,
-Tao


Re: Recovering from Kafka NoReplicaOnlineException with one node

2015-08-10 Thread Mike Thomsen
Thanks, I'll give that a shot. I noticed that our configuration used the
default timeouts for session and sync, so I upped those zookeeper
configuration settings for kafka as well.

On Mon, Aug 10, 2015 at 4:37 PM, Gwen Shapira  wrote:

> Maybe it is not ZooKeeper itself, but the Broker connection to ZK timed-out
> and caused the controller to believe that the broker is dead and therefore
> attempted to elect a new leader (which doesn't exist, since you have just
> one node).
>
> Increasing the zookeeper session timeout value may help. Also, a common
> cause for those timeouts is garbage collection on the broker, changing GC
> policy can help.
>
> Here is the Java configuration used by LinkedIn:
>
> -Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC
> -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
>
>
> Gwen
>
>
> On Mon, Aug 10, 2015 at 11:12 AM, Mike Thomsen 
> wrote:
>
> > We have a really simple Kafka set up in our development lab. It's just
> one
> > node. Periodically, we run into this error:
> >
> > [2015-08-10 13:45:52,405] ERROR Controller 0 epoch 488 initiated state
> > change for partition [test-data,1] from OfflinePartition to
> > OnlinePartition failed (state.change.logger)
> > kafka.common.NoReplicaOnlineException: No replica for partition
> > [test-data,1] is alive. Live brokers are: [Set()], Assigned replicas
> > are: [List(0)]
> > at
> >
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
> > at
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
> > at
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
> > at
> >
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
> > at
> >
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
> > at
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
> >
> > Can anyone recommend a strategy for recovering from this? Is there such a
> > thing or do we need to build out another node or two and set up the
> > replication factor on our topics to cover all of the nodes that we put
> into
> > the cluster?
> >
> > We have 3 zookeeper nodes that respond very well for other applications
> > like Storm and HBase, so we're pretty confident that ZooKeeper isn't to
> > blame here. Any ideas?
> > Thanks,
> >
> > Mike
> >
>


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-10 Thread Ewen Cheslack-Postava
Right now I think the only place the new API is documented is in the
javadocs. Here are the relevant sections for replacing the simple consumer.

Subscribing to specific partitions:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L204
Seeking to specific partitions:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L282

With the new API you'll just need to do something like this:

TopicPartition tp = new TopicPartition("topic", 1);
long offset = 100;

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(tp);
consumer.seek(tp, offset);
while(true) {
   ConsumerRecords records = consumer.poll();
   if (!records.isEmpty()) {
  // records[0] will be the message you wanted
  break;
   }
}



On Mon, Aug 10, 2015 at 3:52 PM, Joe Lawson <
jlaw...@opensourceconnections.com> wrote:

> Ewen,
>
> Do you have an example or link for the changes/plans that will bring the
> benefits you describe?
>
> Cheers,
>
> Joe Lawson
> On Aug 10, 2015 3:27 PM, "Ewen Cheslack-Postava" 
> wrote:
>
> > You can do this using the SimpleConsumer. See
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > for details with some code.
> >
> > When the new consumer is released in 0.8.3, this will get a *lot*
> simpler.
> >
> > -Ewen
> >
> > On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben 
> > wrote:
> >
> > > Does anyone have an example of how to get a single record from a
> > > topic+partition given a specific offset?
> > >
> > > I am interested in this for some retry logic for failed messages.
> > >
> > > Thanks!
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
Thanks,
Ewen


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-10 Thread Joe Lawson
Ewen,

Do you have an example or link for the changes/plans that will bring the
benefits you describe?

Cheers,

Joe Lawson
On Aug 10, 2015 3:27 PM, "Ewen Cheslack-Postava"  wrote:

> You can do this using the SimpleConsumer. See
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> for details with some code.
>
> When the new consumer is released in 0.8.3, this will get a *lot* simpler.
>
> -Ewen
>
> On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben 
> wrote:
>
> > Does anyone have an example of how to get a single record from a
> > topic+partition given a specific offset?
> >
> > I am interested in this for some retry logic for failed messages.
> >
> > Thanks!
> >
> >
>
>
> --
> Thanks,
> Ewen
>


Re: OffsetOutOfRangeError with Kafka-Spark streaming

2015-08-10 Thread Cassa L
Ok. Problem is resolved when I increased retention policy for topic. But
now I see that whenever I restart Spark job, some old messages are being
pulled up by Spark stream. For new Spark stream API, do we need to keep
track of offsets?

LCassa

On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke  wrote:

> Looks like this is likely a case very similar to the case Parth mentioned
> storm users have seen, when processing falls behind the retention period.
>
> Perhaps Spark and Kafka can handle this scenario more gracefully. I would
> be happy to do some investigation/testing and report back with findings and
> potentially open a Jira to track any fix.
>
> On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt <
> pbrahmbh...@hortonworks.com> wrote:
>
> > retention.ms is actually millisecond, you want a value much larger then
> > 1440, which translates to 1.4 seconds.
> >
> >
> > On 8/6/15, 4:35 PM, "Cassa L"  wrote:
> >
> > >Hi Grant,
> > >Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this
> > >exception:
> > >kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we
> > >only
> > >have log segments in the range 2824 to 2824.
> > >at kafka.log.Log.read(Log.scala:380)
> > >at
> >
> >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc
> > >ala:530)
> > >at
> >
> >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
> > >apply(KafkaApis.scala:476)
> > >at
> >
> >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
> > >apply(KafkaApis.scala:471)
> > >at
> >
> >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
> > >a:206)
> > >at
> >
> >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
> > >a:206)
> > >at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> > >at
> > >scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >at scala.collection.immutable.Map$Map1.map(Map.scala:93)
> > >at
> >
> >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s
> > >cala:471)
> > >at
> > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> > >at
> > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> > >at
> >
> >kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc
> > >a
> > >
> > >Similar kind of exception comes to Spark Job.
> > >
> > >Here are my versions :
> > >   Spark - 1.4.1
> > >Kafka - 0.8.1
> > >
> > >I changed retention on config using this command :
> > >./kafka-topics.sh --alter --zookeeper  XXX:2181  --topic MyTopic
> --config
> > >retention.ms=1440  (I believe this is in minutes)
> > >
> > >I am also noticing something in Kafka. When I run below command on
> broker:
> > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2
> > >Earliest offset is being set to latest just in few seconds. Am I
> > >co-relating this issue correctly?
> > >
> > >Here is my example on a new Topic. Initial output of this command is
> > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2
> > >SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > >SLF4J: Defaulting to no-operation (NOP) logger implementation
> > >SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> > >details.
> > >MyTopic:0:60
> > >
> > >I published 4 messages to Kafka. Immediately after few seconds, command
> > >output is:
> > >  MyTopic:0:64
> > >Isn't this supposed to stay at 60 for longer time based on retention
> > >policy?
> > >
> > >
> > >Thanks,
> > >Leena
> > >
> > >
> > >On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke 
> wrote:
> > >
> > >> Does this Spark Jira match up with what you are seeing or sound
> related?
> > >> https://issues.apache.org/jira/browse/SPARK-8474
> > >>
> > >> What versions of Spark and Kafka are you using? Can you include more
> of
> > >>the
> > >> spark log? Any errors shown in the Kafka log?
> > >>
> > >> Thanks,
> > >> Grant
> > >>
> > >> On Thu, Aug 6, 2015 at 1:17 PM, Cassa L  wrote:
> > >>
> > >> > Hi,
> > >> >  Has anyone tried streaming API of Spark with Kafka? I am
> > >>experimenting
> > >> new
> > >> > Spark API to read from Kafka.
> > >> > KafkaUtils.createDirectStream(...)
> > >> >
> > >> > Every now and then, I get following error "spark
> > >> > kafka.common.OffsetOutOfRangeException" and my spark script stops
> > >> working.
> > >> > I have simple topic with just one partition.
> > >> >
> > >> > I would appreciate any clues on how to debug this issue.
> > >> >
> > >> > Thanks,
> > >> > LCassa
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Grant Henke
> > >> Software Engineer | Cloudera
> > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> > >>
> >
> >
>
>
> --
>

Re: abstracting ZooKeeper

2015-08-10 Thread Daniel Nelson
I’m definitely looking forward to progress on this front. We’re currently 
running ZK only for Kafka. If we could have Kafka use our existing Etcd 
cluster, it would be one less critical piece of infrastructure to worry about, 
which would be great.

-- 
Daniel Nelson

> On Aug 9, 2015, at 6:23 PM, Joe Stein  wrote:
> 
> I have started writing a KIP about this topic
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-30+-+Allow+for+brokers+to+have+plug-able+consensus+and+meta+data+storage+sub+systems
> with hopes to get more of it typed out and then circulate for discussion on
> the dev list over the next ~ week.
> 
> I think the plug-in that the project should support is the existing
> implementation which will give folks not looking for an alternative a
> stable upgrade path.
> 
> There are 5 ways I have heard folks wanting to-do this. From the project
> perspective I think that 1 (max 2) should be supported and the rest
> available in contrib repo we can link to.
> 
> Other thoughts/comments happy to catch those and update the KIP.
> 
> Thanks!
> 
> ~ Joe Stein
> - - - - - - - - - - - - - - - - - - -
> [image: Logo-Black.jpg]
>  http://www.elodina.net
>http://www.stealth.ly
> - - - - - - - - - - - - - - - - - - -
> 
> On Sun, Aug 9, 2015 at 8:30 PM, Julio Castillo <
> jcasti...@financialengines.com> wrote:
> 
>> The only reason for this request is because I may want to use alternatives
>> like Consul.
>> 
>> ** julio
>> 
>> On 8/9/15, 3:40 PM, "Joe Lawson" 
>> wrote:
>> 
>>> Inline responses below.
>>> 
>>> Sincerely,
>>> 
>>> Joe Lawson
>>> 
>>> On Aug 9, 2015 1:52 PM, "Julio Castillo" 
>>> wrote:
 
 Thank for the lead.
 Does that mean that Kafka is/will be using Curator?
>>> 
>>> I don't think so.
>>> 
 
 Also, this appears to simplify the interaction with ZooKeeper, but if I
 understand it correctly, it doesn易t abstract the interface where could
 plug-in a different service.
>>> 
>>> You are right. I misunderstood what you meant and am unaware of any
>>> ZooKeeper abstraction api that could allow other k/v stores underneath.
>>> That is an interesting idea. Any reasons for the desire?
>>> 
 
 Thanks
 
 ** julio
 
 On 8/9/15, 10:21 AM, "Joe Lawson" 
 wrote:
 
> Netflix contributed Curator
> (
>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__curator.apache.org_&d
 
 =AwIBaQ&c=cKbMccWasSe6U4u_qE0M-qEjqwAh3shjuL5QPa1B7Yk&r=rJHFl4LhCQ-6kvKRO
 h
 
 IocflKqVSHRTvT-PgdZ5MFuS0&m=tR362iZgCAvRev2Bgf1Itdzh9j2bLPt9FEjoay6gO0A&s
 =
> Rc_80-_j6gjBCp2GKL8sueIP8IKNdt7p7kgBhsPw2ZA&e= ) to Apache which
> implements some generic zk recipes.
> On Aug 9, 2015 11:39 AM, "Julio Castillo"
 >>> 
> wrote:
> 
>> Had there been any thought at abstracting the interface to ZooKeeper?
>> 
>> The reason I'm asking is because I'm looking at Consul for service
>> discovery today, perhaps a different one tomorrow, but the point here
>>> is
>> the ability to plug in any type of service discovery, K/V store
>>> service.
>> 
>> Any thoughts?
>> 
>> ** julio
>> 
>> NOTICE: This e-mail and any attachments to it may be privileged,
>> confidential or contain trade secret information and is intended only
>> for
>> the use of the individual or entity to which it is addressed. If this
>> e-mail was sent to you in error, please notify me immediately by
 either
>> reply e-mail or by phone at 408.498.6000, and do not use,
 disseminate,
>> retain, print or copy the e-mail or any attachment. All messages sent
>>> to
>> and from this e-mail address may be monitored as permitted by or
>> necessary
>> under applicable law and regulations.
>> 
 
 NOTICE: This e-mail and any attachments to it may be privileged,
>>> confidential or contain trade secret information and is intended only for
>>> the use of the individual or entity to which it is addressed. If this
>>> e-mail was sent to you in error, please notify me immediately by either
>>> reply e-mail or by phone at 408.498.6000, and do not use, disseminate,
>>> retain, print or copy the e-mail or any attachment. All messages sent to
>>> and from this e-mail address may be monitored as permitted by or necessary
>>> under applicable law and regulations.
>> 
>> 
>> NOTICE: This e-mail and any attachments to it may be privileged,
>> confidential or contain trade secret information and is intended only for
>> the use of the individual or entity to which it is addressed. If this
>> e-mail was sent to you in error, please notify me immediately by either
>> reply e-mail or by phone at 408.498.6000, and do not use, disseminate,
>> retain, print or copy the e-mail or any attachment. All messages sent to
>> and from this e-mail address may be monitored as permitted by or necessary
>> under applicable law and regulations.
>> 



Re: Recovering from Kafka NoReplicaOnlineException with one node

2015-08-10 Thread Gwen Shapira
Maybe it is not ZooKeeper itself, but the Broker connection to ZK timed-out
and caused the controller to believe that the broker is dead and therefore
attempted to elect a new leader (which doesn't exist, since you have just
one node).

Increasing the zookeeper session timeout value may help. Also, a common
cause for those timeouts is garbage collection on the broker, changing GC
policy can help.

Here is the Java configuration used by LinkedIn:

-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35


Gwen


On Mon, Aug 10, 2015 at 11:12 AM, Mike Thomsen 
wrote:

> We have a really simple Kafka set up in our development lab. It's just one
> node. Periodically, we run into this error:
>
> [2015-08-10 13:45:52,405] ERROR Controller 0 epoch 488 initiated state
> change for partition [test-data,1] from OfflinePartition to
> OnlinePartition failed (state.change.logger)
> kafka.common.NoReplicaOnlineException: No replica for partition
> [test-data,1] is alive. Live brokers are: [Set()], Assigned replicas
> are: [List(0)]
> at
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
> at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
> at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
> at
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
> at
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>
> Can anyone recommend a strategy for recovering from this? Is there such a
> thing or do we need to build out another node or two and set up the
> replication factor on our topics to cover all of the nodes that we put into
> the cluster?
>
> We have 3 zookeeper nodes that respond very well for other applications
> like Storm and HBase, so we're pretty confident that ZooKeeper isn't to
> blame here. Any ideas?
> Thanks,
>
> Mike
>


Re: problem about the offset

2015-08-10 Thread Ewen Cheslack-Postava
Kafka doesn't track per-message timestamps. The request you're using gets a
list of offsets for *log segments* with timestamps earlier than the one you
specify. If you start consuming from the offset returned, you should find
the timestamp you specified in the same log file.

-Ewen

On Mon, Aug 10, 2015 at 2:21 AM, jinhong lu  wrote:

> Hi, all
>
> I try to use SimpleConsumer follow the example at
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> <
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> >.
>
>I modify the offset in the code:
> long readOffset = getLastOffset(consumer,a_topic, a_partition,
> kafka.api.OffsetRequest.EarliestTime(), clientName);
>
>   It works well when I use kafka.api.OffsetRequest.EarliestTime() or
> kafka.api.OffsetRequest.LatestTime(). But when I set it to a UNIX
> TIMESTAMP, it return not the message at that moment.
>
>For example,
> long readOffset = getLastOffset(consumer,a_topic, a_partition,
> 143919600L, clientName);
>
>   I set the timestamp to  143919600L, it means 2015/8/10 16:40:0. But
> it return the message about one hour before that time.
>
> (1)Is it the right way to assign the time stamp? the time stamp should be
> 13bit, not 10bit, right?
> (2)I am in china, using Beijing time, is it has an effect?
> (3) Or any possbile that kafka has any parameter to set the time of the
> cluster?
>
> thanks a lot.
>
>
> BR//lujinhong
>
>


-- 
Thanks,
Ewen


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-10 Thread Ewen Cheslack-Postava
You can do this using the SimpleConsumer. See
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
for details with some code.

When the new consumer is released in 0.8.3, this will get a *lot* simpler.

-Ewen

On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben  wrote:

> Does anyone have an example of how to get a single record from a
> topic+partition given a specific offset?
>
> I am interested in this for some retry logic for failed messages.
>
> Thanks!
>
>


-- 
Thanks,
Ewen


Re: best way to call ReassignPartitionsCommand programmatically

2015-08-10 Thread Ewen Cheslack-Postava
It's not public API so it may not be stable between releases, but you could
try using the ReassignPartitionsCommand class directly. Or, you can see
that the code in that class is a very simple use of ZkUtils, so you could
just make the necessary calls to ZkUtils directly.

In the future, when KIP-4 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations)
is implemented, we'll have public, supported APIs for these types of
commands.


On Wed, Aug 5, 2015 at 7:41 PM, tao xiao  wrote:

> Hi,
>
> I have a requirement that needs to call partition reassignment inside Java
> code. At the current implementation of ReassignPartitionsCommand it expects
> a json file to be passed in. Other than generating a json file and save it
> somewhere in my code what are other options that I can invoke the command
> like passing a json string directly?
>



-- 
Thanks,
Ewen


Re: How to read messages from Kafka by specific time?

2015-08-10 Thread Ewen Cheslack-Postava
You can use SimpleConsumer.getOffsetsBefore to get a list of offsets before
a Unix timestamp. However, this isn't per-message. The offests returned are
for the log segments stored on the broker, so the granularity will depend
on your log rolling settings.

-Ewen

On Wed, Aug 5, 2015 at 2:11 AM, shahab  wrote:

> Hi,
>
> Probably this question has been already asked before, but I couldn't find
> it,
>
> I would like to fetch data from kafka by timestamp, and according to Kafk
> FAQ (
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest
> ?)
> "Kafka allows querying offsets of messages by time", I tried to use
> UnixTimeStamp instead  in the offset request, but every time I got an empty
> array, simply it didn't work.
>
> Based on my google search this is not possible, but Kafka FAQ states that
> this is possible!
> Does any one know how to do this? I do appreciate it.
>
> best,
> /Shahab
>



-- 
Thanks,
Ewen


Recovering from Kafka NoReplicaOnlineException with one node

2015-08-10 Thread Mike Thomsen
We have a really simple Kafka set up in our development lab. It's just one
node. Periodically, we run into this error:

[2015-08-10 13:45:52,405] ERROR Controller 0 epoch 488 initiated state
change for partition [test-data,1] from OfflinePartition to
OnlinePartition failed (state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition
[test-data,1] is alive. Live brokers are: [Set()], Assigned replicas
are: [List(0)]
at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)

Can anyone recommend a strategy for recovering from this? Is there such a
thing or do we need to build out another node or two and set up the
replication factor on our topics to cover all of the nodes that we put into
the cluster?

We have 3 zookeeper nodes that respond very well for other applications
like Storm and HBase, so we're pretty confident that ZooKeeper isn't to
blame here. Any ideas?
Thanks,

Mike


Re: Recovery of Kafka cluster takes very long time

2015-08-10 Thread Alexey Sverdelov
Hi Todd,

It is a good idea, thanks. There is no "recovery.threads.per.data.dir"
entry in our server.properties (so, we run our cluster with default value
1). I will set it to 8 and try again.

Alexey

On Mon, Aug 10, 2015 at 6:13 PM, Todd Palino  wrote:

> It looks like you did an unclean shutdown of the cluster, in which case
> each open log segment in each partition needs to be checked upon startup.
> It doesn't really have anything to do with RF=3 specifically, but it does
> mean that each of your brokers has 6000 partitions to check.
>
> What is the setting of recovery.threads.per.data.dir in your broker
> configuration? The default is 1, which means that upon startup and
> shutdown, the broker only uses 1 thread for checking/closing log segments.
> If you increase this, it will parallelize both the startup and shutdown
> process. This is particularly helpful for recovering from unclean shutdown.
> We generally set it to the number of CPUs in the system, because we want a
> fast recovery.
>
> -Todd
>
>
> On Mon, Aug 10, 2015 at 8:57 AM, Alexey Sverdelov <
> alexey.sverde...@googlemail.com> wrote:
>
> > Hi all,
> >
> > I have a 3 node Kafka cluster. There are ten topics, every topic has 600
> > partitions with RF3.
> >
> > So, after cluster restart I can see the following log message like "INFO
> > Recovering unflushed segment 0 in log..." and the complete recovery of 3
> > nodes takes about 2+ hours.
> >
> > I don't know why it takes so long? Is it because of RF=3?
> >
> > Have a nice day,
> > Alexey
> >
>


Re: Recovery of Kafka cluster takes very long time

2015-08-10 Thread Todd Palino
It looks like you did an unclean shutdown of the cluster, in which case
each open log segment in each partition needs to be checked upon startup.
It doesn't really have anything to do with RF=3 specifically, but it does
mean that each of your brokers has 6000 partitions to check.

What is the setting of recovery.threads.per.data.dir in your broker
configuration? The default is 1, which means that upon startup and
shutdown, the broker only uses 1 thread for checking/closing log segments.
If you increase this, it will parallelize both the startup and shutdown
process. This is particularly helpful for recovering from unclean shutdown.
We generally set it to the number of CPUs in the system, because we want a
fast recovery.

-Todd


On Mon, Aug 10, 2015 at 8:57 AM, Alexey Sverdelov <
alexey.sverde...@googlemail.com> wrote:

> Hi all,
>
> I have a 3 node Kafka cluster. There are ten topics, every topic has 600
> partitions with RF3.
>
> So, after cluster restart I can see the following log message like "INFO
> Recovering unflushed segment 0 in log..." and the complete recovery of 3
> nodes takes about 2+ hours.
>
> I don't know why it takes so long? Is it because of RF=3?
>
> Have a nice day,
> Alexey
>


Recovery of Kafka cluster takes very long time

2015-08-10 Thread Alexey Sverdelov
Hi all,

I have a 3 node Kafka cluster. There are ten topics, every topic has 600
partitions with RF3.

So, after cluster restart I can see the following log message like "INFO
Recovering unflushed segment 0 in log..." and the complete recovery of 3
nodes takes about 2+ hours.

I don't know why it takes so long? Is it because of RF=3?

Have a nice day,
Alexey


Re: Kafka metadata

2015-08-10 Thread Andrew Otto
Note that broker metadata is not necessarily kept in sync with zookeeper on all 
brokers at all times:
https://issues.apache.org/jira/browse/KAFKA-1367

This looks like it is fixed in the upcoming 0.8.3


> On Aug 8, 2015, at 01:08, Abdoulaye Diallo  wrote:
> 
> @Rahul
> 
>> If this is true, why does the producer API makes it necessary to supply a
>> value for metadata.broker.list?
> 
> 
> I believe this is because of the removal of ZK dependency(load balancing is 
> no longer achieved with the use of ZK). For that purpose, 0.8 producer relies 
> on the new cluster metadata api… which requires metadata.broker.list.
>> https://issues.apache.org/jira/browse/KAFKA-369 
>> 
> Abdoulaye
>> On Aug 7, 2015, at 9:10 PM, Rahul Jain  wrote:
>> 
>>> 
>>> Alternatively you can get the same metadata from Zookeeper
>> 
>> If this is true, why does the producer API makes it necessary to supply a
>> value for metadata.broker.list?
>> 
>> I noticed that this wasn't the case in 0.7.
>> 
>> 
>> 
>> On 8 Aug 2015 04:06, "Lukas Steiblys"  wrote:
>> 
>>> Hi Qi,
>>> 
>>> Yes, the metadata request will return information about all the brokers in
>>> the cluster. Alternatively you can get the same metadata from Zookeeper.
>>> 
>>> Lukas
>>> 
>>> -Original Message- From: Qi Xu
>>> Sent: Friday, August 7, 2015 2:30 PM
>>> To: users@kafka.apache.org
>>> Subject: Kafka metadata
>>> 
>>> Hi Everyone,
>>> I have a question that hopes to get some clarification.
>>> In a Kafka cluster, does every broker have the complete view of the
>>> metadata information?
>>> What's the best practice for aproducer to send metadata request? Is it
>>> recommended to send it to all brokers or just one broker?
>>> 
>>> In our scenario, we want to avoid the situation that each producer needs to
>>> talk to every broker because we have hundreds thousands of producers and
>>> the scalability of connection number will be a concern if every cluster
>>> node is connected by any producer.
>>> 
>>> Thanks you for your help.
>>> 
>>> Qi
>>> 
> 



problem about the offset

2015-08-10 Thread jinhong lu
Hi, all

I try to use SimpleConsumer follow the example at 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 
.

   I modify the offset in the code:
long readOffset = getLastOffset(consumer,a_topic, a_partition, 
kafka.api.OffsetRequest.EarliestTime(), clientName);

  It works well when I use kafka.api.OffsetRequest.EarliestTime() or 
kafka.api.OffsetRequest.LatestTime(). But when I set it to a UNIX TIMESTAMP, it 
return not the message at that moment.

   For example, 
long readOffset = getLastOffset(consumer,a_topic, a_partition, 
143919600L, clientName);
 
  I set the timestamp to  143919600L, it means 2015/8/10 16:40:0. But it 
return the message about one hour before that time.

(1)Is it the right way to assign the time stamp? the time stamp should be 
13bit, not 10bit, right?
(2)I am in china, using Beijing time, is it has an effect?
(3) Or any possbile that kafka has any parameter to set the time of the cluster?

thanks a lot.


BR//lujinhong



Re: Partition and consumer configuration

2015-08-10 Thread Shashidhar Rao
Thanks Kumar, for explaining in depth.

On Mon, Aug 10, 2015 at 1:37 PM, Manikumar Reddy 
wrote:

> Hi,
>
>
>
> > 1. Will Kafka distribute the 100 serialized files randomly  say 20 files
> go
> > to   Partition 1, 25 to Partition 2 etc or do I have an option to
> configure
> > how many files go to which partition .
> >
>
>Assuming you are using new producer,
>
>All keyed messages will be distributed based on the hash of the key (or)
>All non-keyed message will be distributed in round-robin fashion.
>(or) You can specify partition number for each message using
> "ProducerRecord" object.
>
>
> > 2.  How to configure each consumer to read from a particular partition
> > only. Say consumer 1 to read from partition 4 only likewise.
> >
> >
>   You can use SimpleConsumer  to consumer from a particular partition.
>https://kafka.apache.org/documentation.html#simpleconsumerapi
>
>
> > 2. If I have not set the max size what will happen to that 1MB file, I
> read
> > in the documentation that default size is 10KB.
>
>
> Default  "max.request.size" on new producer  and "message.max.bytes" on
> broker is 1MB.
> Anything above these limits, you will get exception.
>
>
>  Kumar
>


Re: Partition and consumer configuration

2015-08-10 Thread Manikumar Reddy
Hi,



> 1. Will Kafka distribute the 100 serialized files randomly  say 20 files go
> to   Partition 1, 25 to Partition 2 etc or do I have an option to configure
> how many files go to which partition .
>

   Assuming you are using new producer,

   All keyed messages will be distributed based on the hash of the key (or)
   All non-keyed message will be distributed in round-robin fashion.
   (or) You can specify partition number for each message using
"ProducerRecord" object.


> 2.  How to configure each consumer to read from a particular partition
> only. Say consumer 1 to read from partition 4 only likewise.
>
>
  You can use SimpleConsumer  to consumer from a particular partition.
   https://kafka.apache.org/documentation.html#simpleconsumerapi


> 2. If I have not set the max size what will happen to that 1MB file, I read
> in the documentation that default size is 10KB.


Default  "max.request.size" on new producer  and "message.max.bytes" on
broker is 1MB.
Anything above these limits, you will get exception.


 Kumar


Partition and consumer configuration

2015-08-10 Thread Shashidhar Rao
Hi,

Could somebody help me whether my understanding is correct as I am very new
to kafka.

1. Topic name- ProdCategory,  with 4 partitions. All the messages are XML
files . And consumer numbers also 4. Multi-Broker -4.
2. XML files vary in size from 10 KB- 1 MB.

3. Say if there are 100 XML files.(Serialized)

Questions:

1. Will Kafka distribute the 100 serialized files randomly  say 20 files go
to   Partition 1, 25 to Partition 2 etc or do I have an option to configure
how many files go to which partition .
2.  How to configure each consumer to read from a particular partition
only. Say consumer 1 to read from partition 4 only likewise.

2. If I have not set the max size what will happen to that 1MB file, I read
in the documentation that default size is 10KB.

Thanks