Re: 0.9.0 release branch

2015-11-02 Thread Cliff Rhyne
Thanks, Jun.  We will switch clients shortly after the release.

Cliff

> On Nov 2, 2015, at 7:26 PM, Jun Rao  wrote:
> 
> Cliff,
> 
> We try not to patch the old consumer too much since we are adding the new
> java consumer in 0.9. The new consumer supports callbacks during rebalances
> and can address the problem in KAFKA-2725 better.
> 
> Thanks,
> 
> Jun
> 
>> On Mon, Nov 2, 2015 at 11:16 AM, Cliff Rhyne  wrote:
>> 
>> Hi Jun,
>> 
>> I openned KAFKA-2725 based on my experience with duplicate message
>> processing with auto-commit off.  I think it's a fairly small change,
>> especially for someone familiar with the kafka code-base but it makes a big
>> impact for clients not using auto-commit.  Can this be included in 0.9.0?
>> 
>> Thanks,
>> Cliff
>> 
>> On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson 
>> wrote:
>> 
>>> I added KAFKA-2691 as well, which improves client handling of
>> authorization
>>> errors.
>>> 
>>> -Jason
>>> 
 On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin 
>>> wrote:
>>> 
 Hi Jun,
 
 I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
 scalability issue we saw.
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
> On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
> 
> Hi, everyone,
> 
> We are getting close to the 0.9.0 release. The current plan is to
>> have
 the
> following remaining 0.9.0 blocker issues resolved this week, cut the
 0.9.0
> release branch by Nov. 6 (Friday) and start the RC on Nov. 9
>> (Monday).
>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> 
> Thanks,
> 
> Jun


Re: Problems getting offsets

2015-11-02 Thread Mayuresh Gharat
Hi David,

My Bad. I did not understand your question correctly. Thanks Stevo for
detailed explanation.

Just incase, if you want checkout kafka-based offset management, this is a
very good presentation :
http://www.slideshare.net/jjkoshy/offset-management-in-kafka

Thanks,

Mayuresh

On Mon, Nov 2, 2015 at 12:51 AM, Stevo Slavić  wrote:

> Here is a bit longer and more detailed, not necessarily better
> understandable explanation.
>
> When using Kafka for offsets storage, consumer offsets get stored in
> (compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
> partition of consumer offsets topic could store offsets from multiple
> consumer groups, but offsets of a single consumer group will always be
> stored in (get mapped to) same partition of consumer offsets topic (e.g.
> consumer group x and y offsets could be both stored in partition 0, while
> consumer group z offsets could be stored in partition 49 of consumer
> offsets topic; even if consumer group x is consuming two different topics,
> offsets would be stored in same partition of consumer offsets topic). Like
> any other Kafka topic partition, one can read/write (consumer offsets)
> topic partitions only from their lead brokers, and every partition has only
> one lead broker. Lead broker of a partition where offsets are stored for
> given consumer group is called consumer coordinator broker for that
> consumer group. To fetch/commit offsets for consumer group, you first need
> to resolve consumer coordinator broker for that consumer group, and then
> send fetch/commit offsets to that broker (btw, it's dynamic, can change
> even after just being resolved which broker is coordinator broker for given
> consumer group, e.g. when initial lead broker is lost and a replica becomes
> a new lead). Until there is a leader assigned for a partition and broker
> who is leader is not yet aware of that assignment, topic partition cannot
> be read/written, it's offline - read/write requests made to that broker
> will return error code in response. Consumer offsets topic is not created
> automatically on broker/cluster startup - instead it gets created on first
> (direct or indirect) request for consumer offsets topic metadata... That
> lookup triggers creation of consumer offsets topic, parts of the topic
> creation process happen async to request for topic creation, and it can
> take time for topic creation to actually fully finish. Especially if you
> leave consumer offsets topic default settings (replication factor of 3, 50
> partitions) consumer offsets topic creation can take time - this is nice
> default for production cluster, but not for one used in integration tests
> with single broker.
>
> When you try to fetch (or commit) offset for the first time, broker
> internally retrieves metadata about consumer offsets topic, which initiates
> consumer offsets topic creation when it doesn't exist, but that first
> request to fetch offset (usually) fails, since chosen consumer coordinator
> broker is not yet aware that it is coordinator for that group, and sends
> back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs
> (see
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> ) state that "The broker returns this error code if it receives an offset
> fetch or commit request for a consumer group that it is not a coordinator
> for."
>
> You could modify your code to retry fetching offsets, but not infinitely,
> or you could trigger consumer offsets topic init before fetching offsets.
> For the init option you have (at least) two alternatives.
>
> Modify your test, before reading/committing offsets but not before creating
> your topic to init (trigger creation of) consumer offsets topic by looking
> up consumer offsets topic metadata, and add some delay to let consumer
> offsets topic be fully created (all of its default 50 partitions get leader
> assigned, broker aware, etc.).
>
> You could do this consumer offsets topic initialization before creating
> your topic, but then make sure that in broker configuration replication
> factor for consumer offsets topic is not higher than number of brokers (1
> in scenario your described) - otherwise consumer offsets topic creation
> will fail. It would fail since fresh broker in single broker cluster, if it
> receives a request for consumer offsets topic metadata, will fail to create
> consumer offsets topic, no matter how long delay after metadata lookup; it
> is not aware of how many live brokers there are in cluster and will just
> try to create consumer offsets topic with default replication factor of 3
> and that will fail, initial topic partitions replica assignment happens
> during topic creation and replication factor has to be <= than number of
> live brokers when topic is created. Consumer offsets topic creation does
> not fail (although it takes time to finish), if it is done after some other
> topic creation has been requested, because that

Re: 0.9.0 release branch

2015-11-02 Thread Jun Rao
Cliff,

We try not to patch the old consumer too much since we are adding the new
java consumer in 0.9. The new consumer supports callbacks during rebalances
and can address the problem in KAFKA-2725 better.

Thanks,

Jun

On Mon, Nov 2, 2015 at 11:16 AM, Cliff Rhyne  wrote:

> Hi Jun,
>
> I openned KAFKA-2725 based on my experience with duplicate message
> processing with auto-commit off.  I think it's a fairly small change,
> especially for someone familiar with the kafka code-base but it makes a big
> impact for clients not using auto-commit.  Can this be included in 0.9.0?
>
> Thanks,
> Cliff
>
> On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson 
> wrote:
>
> > I added KAFKA-2691 as well, which improves client handling of
> authorization
> > errors.
> >
> > -Jason
> >
> > On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin 
> wrote:
> >
> > > Hi Jun,
> > >
> > > I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
> > > scalability issue we saw.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
> > >
> > > > Hi, everyone,
> > > >
> > > > We are getting close to the 0.9.0 release. The current plan is to
> have
> > > the
> > > > following remaining 0.9.0 blocker issues resolved this week, cut the
> > > 0.9.0
> > > > release branch by Nov. 6 (Friday) and start the RC on Nov. 9
> (Monday).
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > >
> >
>
>
>
> --
> Cliff Rhyne
> Software Engineering Lead
> m: 760-917-7823
> e: crh...@signal.co
> signal.co
> 
>
> Cut Through the Noise
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized use of this email is strictly prohibited.
> ©2015 Signal. All rights reserved.
>


Re: 0.9.0 release branch

2015-11-02 Thread Onur Karaman
I added KAFKA-2698  as an
0.9.0.0 blocker. It adds an API to query the currently paused partitions.
Here's the PR: https://github.com/apache/kafka/pull/403

On Mon, Nov 2, 2015 at 11:16 AM, Cliff Rhyne  wrote:

> Hi Jun,
>
> I openned KAFKA-2725 based on my experience with duplicate message
> processing with auto-commit off.  I think it's a fairly small change,
> especially for someone familiar with the kafka code-base but it makes a big
> impact for clients not using auto-commit.  Can this be included in 0.9.0?
>
> Thanks,
> Cliff
>
> On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson 
> wrote:
>
> > I added KAFKA-2691 as well, which improves client handling of
> authorization
> > errors.
> >
> > -Jason
> >
> > On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin 
> wrote:
> >
> > > Hi Jun,
> > >
> > > I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
> > > scalability issue we saw.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
> > >
> > > > Hi, everyone,
> > > >
> > > > We are getting close to the 0.9.0 release. The current plan is to
> have
> > > the
> > > > following remaining 0.9.0 blocker issues resolved this week, cut the
> > > 0.9.0
> > > > release branch by Nov. 6 (Friday) and start the RC on Nov. 9
> (Monday).
> > > >
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > >
> >
>
>
>
> --
> Cliff Rhyne
> Software Engineering Lead
> m: 760-917-7823
> e: crh...@signal.co
> signal.co
> 
>
> Cut Through the Noise
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. Any unauthorized use of this email is strictly prohibited.
> ©2015 Signal. All rights reserved.
>


Re: Controller sometimes lose ISR

2015-11-02 Thread Mayuresh Gharat
The broker 61 some how falls behind in fetching from the leader brokers and
hence falls out of the ISR.

[2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
change of replica 61 for partition [test-res-met.server_logs.conv,18] from
OnlineReplica to OfflineReplica...
means that the current controller underwent a failure and came back up, but
some other controller was elected in meant time. The old controller will
eventually resign.
Is this log after you rebounce 61?


Thanks,

Mayuresh

On Sat, Oct 31, 2015 at 5:09 AM, Gleb Zhukov  wrote:

> Hi, Everybody!
>
> Every week on Friday's night I lose ISR for some partitions in my kafka
> cluster:
>
> Topic: test-res-met.server_logs.conv  Partition: 18Leader: 45
> Replicas: 45,61Isr: 45
> Current controller: 45
> Partitions with leader #61 are available, I lose broker #61 only as ISR for
> partitions with another leader.
>
> State logs on broker 61:
>
> [2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
> change of replica 61 for partition [test-res-met.server_logs.conv,18] from
> OnlineReplica to OfflineReplic
> a failed (state.change.logger)
> kafka.common.StateChangeFailedException: Leader and isr path written by
> another controller. This probablymeans the current controller with epoch
> 2233 went through a soft failure
> and another controller was elected with epoch 2234. Aborting state change
> by this controller
> at
>
> kafka.controller.KafkaController.removeReplicaFromIsr(KafkaController.scala:1002)
> at
>
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:250)
> at
>
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at
>
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
>
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:451)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
> Restart of bad broker (#61) helps.
> We have 7day retention for our logs (log.retention.hours=168). Also I
> checked ZK and cron. Could anyone explain such issue? Kafka 0.8.2.1.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: 0.9.0 release branch

2015-11-02 Thread Cliff Rhyne
Hi Jun,

I openned KAFKA-2725 based on my experience with duplicate message
processing with auto-commit off.  I think it's a fairly small change,
especially for someone familiar with the kafka code-base but it makes a big
impact for clients not using auto-commit.  Can this be included in 0.9.0?

Thanks,
Cliff

On Mon, Nov 2, 2015 at 12:57 PM, Jason Gustafson  wrote:

> I added KAFKA-2691 as well, which improves client handling of authorization
> errors.
>
> -Jason
>
> On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin  wrote:
>
> > Hi Jun,
> >
> > I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
> > scalability issue we saw.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
> >
> > > Hi, everyone,
> > >
> > > We are getting close to the 0.9.0 release. The current plan is to have
> > the
> > > following remaining 0.9.0 blocker issues resolved this week, cut the
> > 0.9.0
> > > release branch by Nov. 6 (Friday) and start the RC on Nov. 9 (Monday).
> > >
> > >
> > >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
>



-- 
Cliff Rhyne
Software Engineering Lead
m: 760-917-7823
e: crh...@signal.co
signal.co


Cut Through the Noise

This e-mail and any files transmitted with it are for the sole use of the
intended recipient(s) and may contain confidential and privileged
information. Any unauthorized use of this email is strictly prohibited.
©2015 Signal. All rights reserved.


Re: Migrating both Zookeeper ensemble and Kafka cluster please confirm steps

2015-11-02 Thread John Yost
Hi Tulio,

Thanks a bunch for responding to my question! We basically need to migrate
data from brokers hosted on racks being decommisioned, but the new and old
brokers are actually within the same cluster. The new brokers are
configured to use the new Zookeeper ensemble, but, again, they are intended
to be brokers within the same cluster. Once the data is moved to the new
brokers, the old brokers will be deleted, or at least that's what I am
intending to do.

Please confirm if my approach makes sense or if there is a problem and/or a
better way to do it.

Thanks

--John

On Mon, Nov 2, 2015 at 2:20 PM, Tulio Ballari 
wrote:

> I'm not really sure but I think kafka-reassing-partitions.sh only works to
> move partitions between instanses of the same cluster
>
> If you have two separete clusters, it is not going to work. Are you using
> two different zookeeper clusters, or simply adding instance to the existing
> one?
>
>
>
> On Sun, Nov 1, 2015 at 1:18 PM, John Yost  wrote:
>
> > Hi Everyone,
> >
> > I need to migrate my organization's Kafka cluster along with the
> underlying
> > Zookeeper ensemble (cluster) from one set of racks to another within our
> > data center. I am pretty sure I have the steps correct, but I need to
> > confirm just to ensure I am not missing anything.
> >
> > Here's what I think I need to do in the following order:
> >
> > 1. Configure new Zookeeper ensemble
> > 2. Configure brokers for new Kafka cluster, with zookeeper.connect list
> of
> > new Zookeeper ensemble nodes
> > 3. Startup new Zookeeper ensemble
> > 4. Startup new Kafka cluster
> > 5. Shutdown data feeds to old Kafka cluster and reconfigure for new
> cluster
> > 6. Migrate topics from old Kafka brokers to new brokers via
> > kafka-reassign-partitions.sh script and the instructions here:
> >
> http://kafka.apache.org/082/documentation.html#basic_ops_cluster_expansion
> > 7. Export consumer offsets from old Zookeeper ensemble and import into
> new
> > Zookeeper ensemble
> > 8. Shutdown old Kafka cluster
> > 9. Restart data feeds into new Kafka cluster
> >
> > The Kafka documentation is great and I've tested out the topic
> reassignment
> > and consumer offset import and export, but, again, just want to ensure I
> am
> > not missing anything.
> >
> > Thanks
> >
> > --John
> >
>


Re: Migrating both Zookeeper ensemble and Kafka cluster please confirm steps

2015-11-02 Thread Tulio Ballari
I'm not really sure but I think kafka-reassing-partitions.sh only works to
move partitions between instanses of the same cluster

If you have two separete clusters, it is not going to work. Are you using
two different zookeeper clusters, or simply adding instance to the existing
one?



On Sun, Nov 1, 2015 at 1:18 PM, John Yost  wrote:

> Hi Everyone,
>
> I need to migrate my organization's Kafka cluster along with the underlying
> Zookeeper ensemble (cluster) from one set of racks to another within our
> data center. I am pretty sure I have the steps correct, but I need to
> confirm just to ensure I am not missing anything.
>
> Here's what I think I need to do in the following order:
>
> 1. Configure new Zookeeper ensemble
> 2. Configure brokers for new Kafka cluster, with zookeeper.connect list of
> new Zookeeper ensemble nodes
> 3. Startup new Zookeeper ensemble
> 4. Startup new Kafka cluster
> 5. Shutdown data feeds to old Kafka cluster and reconfigure for new cluster
> 6. Migrate topics from old Kafka brokers to new brokers via
> kafka-reassign-partitions.sh script and the instructions here:
> http://kafka.apache.org/082/documentation.html#basic_ops_cluster_expansion
> 7. Export consumer offsets from old Zookeeper ensemble and import into new
> Zookeeper ensemble
> 8. Shutdown old Kafka cluster
> 9. Restart data feeds into new Kafka cluster
>
> The Kafka documentation is great and I've tested out the topic reassignment
> and consumer offset import and export, but, again, just want to ensure I am
> not missing anything.
>
> Thanks
>
> --John
>


Re: 0.9.0 release branch

2015-11-02 Thread Jason Gustafson
I added KAFKA-2691 as well, which improves client handling of authorization
errors.

-Jason

On Mon, Nov 2, 2015 at 10:25 AM, Becket Qin  wrote:

> Hi Jun,
>
> I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
> scalability issue we saw.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:
>
> > Hi, everyone,
> >
> > We are getting close to the 0.9.0 release. The current plan is to have
> the
> > following remaining 0.9.0 blocker issues resolved this week, cut the
> 0.9.0
> > release branch by Nov. 6 (Friday) and start the RC on Nov. 9 (Monday).
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> >
> > Thanks,
> >
> > Jun
> >
>


Re: High delay during controlled shutdown and acks=-1

2015-11-02 Thread Becket Qin
Hi Federico,

What is your replica.lag.time.max.ms?

When acks=-1, the ProducerResponse won't return until all the broker in ISR
get the message. During controlled shutdown, the shutting down broker is
doing a lot of leader migration and could slow down on fetching data. The
broker won't be kicked out of ISR until at least replica.lag.time.max.ms.
Reducing the configuration will let the shutting down broker to be kicked
out of ISR quicker if it cannot catch up. But if you set it too small,
there could be more ISR expansion/shrinking.

That said, currently controlled shutdown is not very efficient. We might
improve it and hopefully later on it won't slow down the replication on the
shutting down broker.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 2, 2015 at 5:52 AM, Federico Giraud 
wrote:

> Hi,
>
> I have few java async producers sending data to a 4-node Kafka cluster
> version 0.8.2, containing few thousand topics, all with replication factor
> 2. When i use acks=1 and trigger a controlled shutdown + restart on one
> broker, the producers will send data to the new leader, reporting a very
> low additional delay during the transition (as expected). However if i use
> acks=-1, the producers will report a ~15 seconds delay between the send and
> the future.get. Is this behavior expected? Is there a way to make it
> faster? Or maybe it is a problem with my configuration?
>
> Broker configs:
> broker.id=0
> log.dirs=/var/kafka/md1/kafka-logs
> zookeeper.connect=10.40.27.107,10.40.27.108,10.40.27.109
> auto.create.topics.enable=true
> default.replication.factor=2
> delete.topic.enable=true
> log.retention.hours=24
> num.io.threads=5
>
> Producer configs:
> acks = -1
> retries = 3
> timeout.ms = 3000
> batch.size = 1048576
> linger.ms= 100
> metadata.fetch.timeout.ms = 5000
> metadata.max.age.ms = 6
>
> I tried different configurations, but i wasn't able to reduce the delay
> during broker restart. The logs in the broker indicate that the controlled
> shutdown was successful.
>
> Thank you
>
> regards,
> Federico
>


Re: 0.9.0 release branch

2015-11-02 Thread Becket Qin
Hi Jun,

I added KAFKA-2722 as a blocker for 0.9. It fixes the ISR propagation
scalability issue we saw.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 2, 2015 at 9:16 AM, Jun Rao  wrote:

> Hi, everyone,
>
> We are getting close to the 0.9.0 release. The current plan is to have the
> following remaining 0.9.0 blocker issues resolved this week, cut the 0.9.0
> release branch by Nov. 6 (Friday) and start the RC on Nov. 9 (Monday).
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
>
> Thanks,
>
> Jun
>


Re: Quick kafka-reassign-partitions.sh script question

2015-11-02 Thread John Yost
Nice! Thanks Gwen!

--John

On Mon, Nov 2, 2015 at 1:03 PM, Gwen Shapira  wrote:

> Actually, no. You can move partitions online.
>
> The way it works is that:
> 1. A new replica is created for the partition in the new broker
> 2. It starts replicating from the leader until it catches up - if you
> continue producing at this time, it will take longer to catch up.
> 3. Once the new replica caught up, the old replica will get deleted.
>
> Hope this clarifies.
>
> On Mon, Nov 2, 2015 at 5:43 AM, John Yost  wrote:
>
> > Hi Everyone,
> >
> > Perhaps a silly question...does one need to shut down incoming data feeds
> > to Kafka prior to moving partititions via the
> kafka-reassign-partitions.sh
> > script? My thought is yes, but just want to be sure.
> >
> > Thanks
> >
> > --John
> >
>


Re: Quick kafka-reassign-partitions.sh script question

2015-11-02 Thread Gwen Shapira
Actually, no. You can move partitions online.

The way it works is that:
1. A new replica is created for the partition in the new broker
2. It starts replicating from the leader until it catches up - if you
continue producing at this time, it will take longer to catch up.
3. Once the new replica caught up, the old replica will get deleted.

Hope this clarifies.

On Mon, Nov 2, 2015 at 5:43 AM, John Yost  wrote:

> Hi Everyone,
>
> Perhaps a silly question...does one need to shut down incoming data feeds
> to Kafka prior to moving partititions via the kafka-reassign-partitions.sh
> script? My thought is yes, but just want to be sure.
>
> Thanks
>
> --John
>


Re: Kafka and Spark Issue

2015-11-02 Thread Gwen Shapira
Since the error is from the HBase client and completely unrelated to Kafka,
you will have better luck in the HBase mailing list.

On Mon, Nov 2, 2015 at 9:16 AM, Nikhil Gs  wrote:

> Hello Team,
>
> My scenario is to load the data from producer topic to Hbase by using Spark
> API. Our cluster is Kerberos authenticated and when we running the below
> kafkaToHbase.java and the error which I am facing is also below. Let me
> know if anyone can have any idea what can be done.
>
> package com.spark.example;
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.MasterNotRunningException;
> import org.apache.hadoop.hbase.client.HBaseAdmin;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.security.UserGroupInformation;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat;
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.StorageLevels;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
>
> import scala.Tuple2;
>
> public class KafkaToHbase {
>
> private final static String rawTableName = "kafkatest1";
>
> //final static String user = "a_rakesh.samin...@suddenlink.cequel3.com
> ";
> //sfinal static String keyPath =
> "/home/a_rakesh.samineni/a_rakesh.samineni.keytab";
>
> @SuppressWarnings({ "serial", "deprecation" })
> public static void main(String[] args) throws Exception {
> // // Authenticating Kerberos principal
> //System.out.println("Principal Authentication: ");
> //try {
> //
> //UserGroupInformation.loginUserFromKeytab(user, keyPath);
> //}
> //catch(Exception e){
> // e.printStackTrace();
> //}
>
>
> //1. Create the spark streaming context with a 10 second batch size
> SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest");
> sparkConf.set("spark.driver.allowMultipleContexts", "true");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> int numThreads = Integer.parseInt(args[3]);
> Map topicMap = new HashMap();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
> topicMap.put(topic, numThreads);
> }
>
> //System.out.println("creating stream..");
> JavaPairReceiverInputDStream messages =
> KafkaUtils.createStream(ssc, args[0], args[1], topicMap,
> StorageLevels.MEMORY_AND_DISK_SER);
> //messages.print();
>  //System.out.println("messages"+messages);
>
> //3. create connection with HBase
>Configuration config = null;
> try {
> System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
> //System.out.println("in hbase configuraiton");
> //UserGroupInformation.loginUserFromKeytab(user, keyPath);
> config = HBaseConfiguration.create();
> config.set("hadoop.security.authentication","kerberos");
> config.set("hbase.rpc.protection", "privacy");
> config.addResource(new
> org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml"));
>config.addResource(new
> org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"));
> config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02,
> sdldalplhdm01");
> config.set("hbase.zookeeper.property.clientPort","2181");
> UserGroupInformation.setConfiguration(config);
> //System.out.println("after hbase configuration");
> UserGroupInformation.loginUserFromKeytab("
> a_nikhil.gopishe...@suddenlink.cequel3.com",
> "/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab");
> //config.set("hbase.master", "10.48.210.248:60010");
> HBaseAdmin.checkHBaseAvailable(config);
> //System.out.println("HBase is running!");
> }
> catch (MasterNotRunningException e) {
> System.out.println("HBase is not running!");
> System.exit(1);
> }catch (Exception ce){
> ce.printStackTrace();
> }
>
> //config.set(TableInputFormat.INPUT_TABLE, rawTableName);
>
> //4. new Hadoop API configuration
> JavaDStream lines = messages.map(new Function String>, String>() {
> public String call(Tuple2 tuple2) {
> return tuple2._2();
> }
>
> });
> JavaPairDStream hbasePuts= lines.mapToPair(
> new PairFunction(){
>
> @SuppressWarnings("deprecation")
> public Tuple2 call(String line) {
> Put put = new Put(Bytes.toBytes(Math.random()));
> put.add(Bytes.toBytes("cf12"), Bytes.toBytes("first

Kafka and Spark Issue

2015-11-02 Thread Nikhil Gs
Hello Team,

My scenario is to load the data from producer topic to Hbase by using Spark
API. Our cluster is Kerberos authenticated and when we running the below
kafkaToHbase.java and the error which I am facing is also below. Let me
know if anyone can have any idea what can be done.

package com.spark.example;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaToHbase {

private final static String rawTableName = "kafkatest1";

//final static String user = "a_rakesh.samin...@suddenlink.cequel3.com";
//sfinal static String keyPath =
"/home/a_rakesh.samineni/a_rakesh.samineni.keytab";

@SuppressWarnings({ "serial", "deprecation" })
public static void main(String[] args) throws Exception {
// // Authenticating Kerberos principal
//System.out.println("Principal Authentication: ");
//try {
//
//UserGroupInformation.loginUserFromKeytab(user, keyPath);
//}
//catch(Exception e){
// e.printStackTrace();
//}


//1. Create the spark streaming context with a 10 second batch size
SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingTest");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(10));

int numThreads = Integer.parseInt(args[3]);
Map topicMap = new HashMap();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

//System.out.println("creating stream..");
JavaPairReceiverInputDStream messages =
KafkaUtils.createStream(ssc, args[0], args[1], topicMap,
StorageLevels.MEMORY_AND_DISK_SER);
//messages.print();
 //System.out.println("messages"+messages);

//3. create connection with HBase
   Configuration config = null;
try {
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
//System.out.println("in hbase configuraiton");
//UserGroupInformation.loginUserFromKeytab(user, keyPath);
config = HBaseConfiguration.create();
config.set("hadoop.security.authentication","kerberos");
config.set("hbase.rpc.protection", "privacy");
config.addResource(new
org.apache.hadoop.fs.Path("/etc/hbase/conf/core-site.xml"));
   config.addResource(new
org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"));
config.set("hbase.zookeeper.quorum", "sdldalplhdm03, sdldalplhdm02,
sdldalplhdm01");
config.set("hbase.zookeeper.property.clientPort","2181");
UserGroupInformation.setConfiguration(config);
//System.out.println("after hbase configuration");
UserGroupInformation.loginUserFromKeytab("
a_nikhil.gopishe...@suddenlink.cequel3.com",
"/home/a_nikhil.gopishetti/a_nikhil.gopishetti.keytab");
//config.set("hbase.master", "10.48.210.248:60010");
HBaseAdmin.checkHBaseAvailable(config);
//System.out.println("HBase is running!");
}
catch (MasterNotRunningException e) {
System.out.println("HBase is not running!");
System.exit(1);
}catch (Exception ce){
ce.printStackTrace();
}

//config.set(TableInputFormat.INPUT_TABLE, rawTableName);

//4. new Hadoop API configuration
JavaDStream lines = messages.map(new Function, String>() {
public String call(Tuple2 tuple2) {
return tuple2._2();
}

});
JavaPairDStream hbasePuts= lines.mapToPair(
new PairFunction(){

@SuppressWarnings("deprecation")
public Tuple2 call(String line) {
Put put = new Put(Bytes.toBytes(Math.random()));
put.add(Bytes.toBytes("cf12"), Bytes.toBytes("firstColumn"),
Bytes.toBytes(line));
return new Tuple2(new
ImmutableBytesWritable(), put);
}
});

final Job newAPIJobConfiguration1 = Job.getInstance(config);
hbasePuts.foreachRDD(new Function,
Void>() {
public Void call(JavaPairRDD
hbasePutJavaPairRDD) throws Exception {

newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
rawTableName);
newAPIJobConfiguration1.setOutputFormatClass(org.apache.hadoop.

0.9.0 release branch

2015-11-02 Thread Jun Rao
Hi, everyone,

We are getting close to the 0.9.0 release. The current plan is to have the
following remaining 0.9.0 blocker issues resolved this week, cut the 0.9.0
release branch by Nov. 6 (Friday) and start the RC on Nov. 9 (Monday).

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC

Thanks,

Jun


Re: Topic per entity

2015-11-02 Thread Grant Henke
Hi Alex & Andrew,

There was a discussion with some pointers on this mailing list a bit ago
titled "mapping events to topics". I suggest taking a look at that thread:
http://search-hadoop.com/m/uyzND1vJsUuYtGD91/mapping+events+to+topics&subj=mapping+events+to+topics

If you still have questions, don't hesitate to ask.

Thanks,
Grant



On Sat, Oct 31, 2015 at 3:19 AM, Andrew Stevenson 
wrote:

> I too would be interested in any responses to this question.
>
> I'm using kafka for event notification and once secure will put real
> payload in it and take advantage of the durable commit log. I want to let
> users describe a DAG in orientdb and have the Kafka Client processor load
> and execute it. Each processor would then attach it's lineage and
> provenance back to the orientdbs graph store.
>
> This way I can let users replay stress scenarios, calculate VaR etc with
> one source of replayable truth. Compliance and regulatory authorities like
> this.
>
> Regards
>
> Andrew
> 
> From: Alex Buchanan
> Sent: ‎31/‎10/‎2015 05:30
> To: users@kafka.apache.org
> Subject: Topic per entity
>
> Hey Kafka community.
>
> I'm researching possible architecture for a distributed data processing
> system. In this system, there's a close relationship between a specific
> dataset and the processing code. The user might upload a few datasets and
> write code to run analysis on that data. In other words, frequently the
> analysis code pulls data from a specific entity.
>
> Kafka is attractive for lots of reasons:
> - I'll need messaging anyway
> - I want a model for immutability of data (mutable state and potential job
> failure don't mix)
> - cross-language clients
> - the change stream concept could have some nice uses (such as updating
> visualizations without rebuilding)
> - Samza's model of state management is a simple way to think of external
> data without worrying too-much about network-based RPC
> - as a source of truth data store, it's really simple. No mutability,
> complex queries, etc. Just a log. To me, that helps prevent abuse and
> mistakes.
> - it fits well with the concept of pipes, frequently found in data analysis
>
> But most of the Kafka examples are about processing a large stream of a
> specific _type_, not so much about processing specific entities. And I
> understand there are limits to topics (file/node limits on the filesystem
> and in zookeeper) and it's discouraged to model topics based on
> characteristics of data. In this system, it feels more natural to have a
> topic per entity so the processing code can connect directly to the data it
> wants.
>
> So I need a little guidance from smart people. Am I lost in the rabbit
> hole? Maybe I'm trying to force Kafka into this territory it's not suited
> for. Have I been reading too many (awesome) articles about the role of the
> log and streaming in distributed computing? Or am I on the right track and
> I just need to put in some work to jump the hurdles (such as topic storage
> and coordination)?
>
> It sounds like Cassandra might be another good option, but I don't know
> much about it yet.
>
> Thanks guys!
>



-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Error Producer

2015-11-02 Thread Eduardo Alfaia
Hi guys,

How could I solve this problem? 

%Failed to produce message: Local: Queue full

Thanks


High delay during controlled shutdown and acks=-1

2015-11-02 Thread Federico Giraud
Hi,

I have few java async producers sending data to a 4-node Kafka cluster
version 0.8.2, containing few thousand topics, all with replication factor
2. When i use acks=1 and trigger a controlled shutdown + restart on one
broker, the producers will send data to the new leader, reporting a very
low additional delay during the transition (as expected). However if i use
acks=-1, the producers will report a ~15 seconds delay between the send and
the future.get. Is this behavior expected? Is there a way to make it
faster? Or maybe it is a problem with my configuration?

Broker configs:
broker.id=0
log.dirs=/var/kafka/md1/kafka-logs
zookeeper.connect=10.40.27.107,10.40.27.108,10.40.27.109
auto.create.topics.enable=true
default.replication.factor=2
delete.topic.enable=true
log.retention.hours=24
num.io.threads=5

Producer configs:
acks = -1
retries = 3
timeout.ms = 3000
batch.size = 1048576
linger.ms= 100
metadata.fetch.timeout.ms = 5000
metadata.max.age.ms = 6

I tried different configurations, but i wasn't able to reduce the delay
during broker restart. The logs in the broker indicate that the controlled
shutdown was successful.

Thank you

regards,
Federico


Quick kafka-reassign-partitions.sh script question

2015-11-02 Thread John Yost
Hi Everyone,

Perhaps a silly question...does one need to shut down incoming data feeds
to Kafka prior to moving partititions via the kafka-reassign-partitions.sh
script? My thought is yes, but just want to be sure.

Thanks

--John


Re: Problems getting offsets

2015-11-02 Thread Stevo Slavić
Here is a bit longer and more detailed, not necessarily better
understandable explanation.

When using Kafka for offsets storage, consumer offsets get stored in
(compacted) consumer offsets Kafka topic ("__consumer_offsets"). Every
partition of consumer offsets topic could store offsets from multiple
consumer groups, but offsets of a single consumer group will always be
stored in (get mapped to) same partition of consumer offsets topic (e.g.
consumer group x and y offsets could be both stored in partition 0, while
consumer group z offsets could be stored in partition 49 of consumer
offsets topic; even if consumer group x is consuming two different topics,
offsets would be stored in same partition of consumer offsets topic). Like
any other Kafka topic partition, one can read/write (consumer offsets)
topic partitions only from their lead brokers, and every partition has only
one lead broker. Lead broker of a partition where offsets are stored for
given consumer group is called consumer coordinator broker for that
consumer group. To fetch/commit offsets for consumer group, you first need
to resolve consumer coordinator broker for that consumer group, and then
send fetch/commit offsets to that broker (btw, it's dynamic, can change
even after just being resolved which broker is coordinator broker for given
consumer group, e.g. when initial lead broker is lost and a replica becomes
a new lead). Until there is a leader assigned for a partition and broker
who is leader is not yet aware of that assignment, topic partition cannot
be read/written, it's offline - read/write requests made to that broker
will return error code in response. Consumer offsets topic is not created
automatically on broker/cluster startup - instead it gets created on first
(direct or indirect) request for consumer offsets topic metadata... That
lookup triggers creation of consumer offsets topic, parts of the topic
creation process happen async to request for topic creation, and it can
take time for topic creation to actually fully finish. Especially if you
leave consumer offsets topic default settings (replication factor of 3, 50
partitions) consumer offsets topic creation can take time - this is nice
default for production cluster, but not for one used in integration tests
with single broker.

When you try to fetch (or commit) offset for the first time, broker
internally retrieves metadata about consumer offsets topic, which initiates
consumer offsets topic creation when it doesn't exist, but that first
request to fetch offset (usually) fails, since chosen consumer coordinator
broker is not yet aware that it is coordinator for that group, and sends
back NotCoordinatorForConsumerCode/NotCoordinatorForConsumerException. Docs
(see
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
) state that "The broker returns this error code if it receives an offset
fetch or commit request for a consumer group that it is not a coordinator
for."

You could modify your code to retry fetching offsets, but not infinitely,
or you could trigger consumer offsets topic init before fetching offsets.
For the init option you have (at least) two alternatives.

Modify your test, before reading/committing offsets but not before creating
your topic to init (trigger creation of) consumer offsets topic by looking
up consumer offsets topic metadata, and add some delay to let consumer
offsets topic be fully created (all of its default 50 partitions get leader
assigned, broker aware, etc.).

You could do this consumer offsets topic initialization before creating
your topic, but then make sure that in broker configuration replication
factor for consumer offsets topic is not higher than number of brokers (1
in scenario your described) - otherwise consumer offsets topic creation
will fail. It would fail since fresh broker in single broker cluster, if it
receives a request for consumer offsets topic metadata, will fail to create
consumer offsets topic, no matter how long delay after metadata lookup; it
is not aware of how many live brokers there are in cluster and will just
try to create consumer offsets topic with default replication factor of 3
and that will fail, initial topic partitions replica assignment happens
during topic creation and replication factor has to be <= than number of
live brokers when topic is created. Consumer offsets topic creation does
not fail (although it takes time to finish), if it is done after some other
topic creation has been requested, because that other topic creation
request makes broker aware (updates its cache) that it is a sole live
broker in the cluster, and then consumer offsets topic creation will ignore
requested/configured replication factor of 3 and will (silently) fallback
and use replication factor of 1 (= number of live brokers in cluster).

Maybe things would be cleaner if topic creation allowed non-live brokers to
be used in replica assignment. Then not only would (consumer offsets) topic
creation not f