Re: kafka.tools.ConsumerOffsetChecker fails for one topic

2016-05-30 Thread Gerard Klijs
It might be there never was/currently isn't a consumer with the group
jopgroup consuming from the twitter topic. I only used it for the new
consumer(offsets in broker), and then the group needs to be 'active' in
order to get the offsets.

On Mon, May 30, 2016 at 2:37 PM Diego Woitasen  wrote:

> Hi,
>   I have these topic in Kafka:
>
> # /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181
> --list__consumer_offsets
> facebook
> twitter
>
> # bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group jopgroup
> --zookeeper localhost:2181 --topic twitter
> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for /consumers/jopgroup/offsets/twitter/0.
>
> Same command for "facebook" topic works fine.
>
> More info:
>
> # /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> --topic twitter
> Topic:twitter   PartitionCount:1ReplicationFactor:1 Configs:
> Topic: twitter  Partition: 0Leader: 1   Replicas: 1
> Isr: 1
>
> Thanks in advance!
> --
>
> Diego Woitasen
> http://flugel.it
> Infrastructure Developers
>


Re: Kafka Consumer rebalancing frequently

2016-05-30 Thread sahitya agrawal
Hi,

I still face the same issue sometimes. My kafka consumer is giving this
exception after failing to claim any partition.

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


Regards,
Sahitya Agrawal

On Tue, May 17, 2016 at 12:22 AM, Jason Gustafson 
wrote:

> To pile on a little bit, the API is designed to ensure consumer liveness so
> that partitions cannot be held indefinitely by a defunct process. Since
> heartbeating and message processing are done in the same thread, the
> consumer needs to demonstrate "progress" by calling poll() often enough not
> to get kicked out. What "often enough" means is dictated by the session
> timeout, which is 30s by default. If you fail to call poll() before the
> session timer expires, then the broker will assume that the member is dead
> and begin a rebalance. If you need more time to handle messages, increase
> session.timeout.ms in your configuration. The only downside to a higher
> timeout in general is that it will take longer to detect other kinds of
> failures (such as process crashes or network partitions).
>
> This was the initial design, but it hasn't worked out quite as well as we
> would have liked, at least not in all situations. The first problem in 0.9
> is that you don't have a direct way to control the amount of data that can
> be returned in a call to poll(), which makes it difficult to estimate the
> session timeout. You can set max.partition.fetch.bytes and, based on an
> estimate for the total number of partitions that you need to read, try to
> come up with a guess, but this is kind of hard in practice. So in 0.10
> we've introduced a new setting max.poll.records, which lets you set an
> explicit bound on the number of messages that need to be handled on each
> poll iteration. The idea is hopefully that you can set this to a reasonably
> low value so that you're never risking a session timeout.
>
> It's also worthwhile understanding a little bit about how the rebalance
> mechanism works. After a consumer group is created, each consumer begins
> sending heartbeat messages to a special broker known as the coordinator.
> When a new consumer joins the group (or when the session timeout of an
> existing member expires), the other members find out about it through the
> error code in the heartbeat response. The group coordination protocol
> basically implements a synchronization barrier. When a rebalance begins,
> all members of the group have to join the barrier for it to complete. So if
> you want to reduce the impact from rebalancing, then you need to ensure
> that all members can join the barrier as soon as possible after it begins.
> For this, we expose heartbeat.interval.ms, but note that we can't actually
> send heartbeats any faster than the poll() interval itself because
> everything is done from the same thread. So if you want to always have fast
> rebalances, then the target for setting the processing bound should be the
> heartbeat interval instead of the session timeout.
>
> We've made some other small improvements to make unexpected rebalancing
> less of a problem in practice. For example, we modified the protocol
> behavior to allow offset commits to serve as effective heartbeats, which
> wasn't the case in 0.9. However, we're still encountering situations where
> there's really no clear way to estimate the session timeout other than
> somewhat exhaustive testing. Even max.poll.records doesn't help when the
> impact of a single message can vary disproportionately (as is sometimes the
> case in Kafka Streams which uses the consumer internally). You could set a
> ridiculously large session timeout in these cases, but that guarantees also
> a long time to recover from hard failures. I think this basically means
> that these use cases need a separate notion of liveness, which they have a
> bit more control over. For example, we can expose a method in the consumer
> which applications can call from any thread to know that they're still
> around. I'm working on a KIP right now to address this problem, so look for
> it in the next few weeks.
>
> Thanks,
> Jason
>
> On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal 
> wrote:
>
> > Thanks Cees and Abhinav, will give this trick a try and update if it
> helped
> > for my case.
> >
> > Regards,
> > Sahitya Agrawal
> >
> > On Fri, May 13, 2016 at 9:36 PM, Cees de Gr

about Kafka Client rebalance

2016-05-30 Thread fuyou
hello

  I read the Kafka Client source code .Client do rebalance when receive
JoinGroupResponse  if it is the client leader.if it is not a leader,it
don't rebalance?

when has many Kafka Client and them same groupId,how to do rebalance?

thanks  all.


org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler#handle

the code as below :

if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future); //do rebalance
} else {
onJoinFollower().chain(future);//no rebalance
}






-- 
   =

 fuyou001
Best Regards


Re: Ping kafka broker for healthcheck

2016-05-30 Thread Fredo Lee
you can use kafka java metrics to monitor some events about  kafka healthy.
as to kafka listen port, just check it.

2016-05-30 13:04 GMT+08:00 Joe San :

> Is there any such API on the consumer or the producer that I can use to
> check for the underlying connection to the kafka brokers from my producer?
> I need to ping Kafka broker every minute and check if there is a connection
> available? Any suggestions?
>


Re: soft failure for kakfa 0.8.2.2

2016-05-30 Thread Fredo Lee
thanks for your reply.

yes, there are more than one controller. the msg of "soft failure" is
reported by the old controller.

2016-05-31 11:42 GMT+08:00 Muqtafi Akhmad :

> hello Fredo,
>
> My guess is that there was partition leader election that was might be
> triggered by detection of offline partition in Kafka cluster. Somehow the
> broker try to trigger leader election while previous election has been
> completed hence this log :
>
> > [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 initiated state
> > change for partition [consup-43,78] from OnlinePartition to
> > OnlinePartition failed (state.change.logger)
> > kafka.common.StateChangeFailedException: encountered error while electing
> > leader for partition [consup-43,78] due to: aborted leader
> election
> > for partition [consup-43,78]
> > since the LeaderAndIsr path was already written by another controller.
> This
> > probably means that the current controller 1008 went through a soft
> failure
> > and another controller was elec
> > ted with epoch 14..
>
> The question is,
> - was there any offline partition?
> - was there more than one active controller?
>
> CMIIW
>
>
> On Mon, May 30, 2016 at 2:41 PM, Fredo Lee 
> wrote:
>
> > my server.log 
> >
> > lots of  error msg:
> >
> > [2016-05-28 12:12:31,132] WARN [ReplicaFetcherThread-0-1007], Replica
> 1008
> > for partition [consup-03,16] reset its fetch offset from 13985537
> > to current leader 1007's latest offset 13985537
> > (kafka.server.ReplicaFetcherThread)
> > [2016-05-28 12:12:31,132] ERROR [ReplicaFetcherThread-0-1007], Current
> > offset 13987676 for partition [consup-03,16] out of range; reset
> > offset to 13985537 (kafka.server.ReplicaFetcherThread)
> >
> >
> > the other error msg:
> >
> > [2016-05-28 12:12:31,708] ERROR [Replica Manager on Broker 1008]: Error
> > when processing fetch request for partition [consup-03,35] offset
> > 13848954 from consumer with correlation id 0. Possible cause: Request for
> > offset 13848954 but we only have log segments in the range 12356946 to
> > 13847167. (kafka.server.ReplicaManager)
> >
> >
> >
> > 2016-05-30 15:37 GMT+08:00 Fredo Lee :
> >
> > > My state-change.log>>>
> > >
> > >
> > > [2016-05-28 12:33:31,510] ERROR Controller 1008 epoch 13 aborted leader
> > > election for partition [consup-43,78] since the LeaderAndIsr
> path
> > > was already written by another controll
> > > er. This probably means that the current controller 1008 went through a
> > > soft failure and another controller was elected with epoch 14.
> > > (state.change.logger)
> > > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(1020,-1) for a request sent to broker
> > id:1009,host:
> > > 22.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(1066,-1) for a request sent to broker
> > id:1001,host:
> > > consup-kafka20.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(777,-1) for a request sent to broker
> id:1006,host:
> > > consup-kafka11.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(742,-1) for a request sent to broker
> id:1018,host:
> > > consup-kafka10.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(1021,-1) for a request sent to broker
> > id:1009,host:
> > > consup-kafka22.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(1067,-1) for a request sent to broker
> > id:1001,host:
> > > consup-kafka20.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(778,-1) for a request sent to broker
> id:1006,host:
> > > consup-kafka11.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> > response
> > > UpdateMetadataResponse(1022,-1) for a request sent to broker
> > id:1009,host:
> > > consup-kafka22.com,port:9092 (state.change.logger)
> > > [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 encountered
> > error
> > > while electing leader for partition [consup-43,78] due to:
> > aborted
> > > leader election for partition [consup
> > > -43,78] since the LeaderAndIsr path was already written by
> > another
> > > controller. This probably means that the current controller 1008 went
> > > through a soft failure and another con
> > > troller was elected with epoch 14.. (state.change.logger)
> > > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
>

Re: soft failure for kakfa 0.8.2.2

2016-05-30 Thread Muqtafi Akhmad
hello Fredo,

My guess is that there was partition leader election that was might be
triggered by detection of offline partition in Kafka cluster. Somehow the
broker try to trigger leader election while previous election has been
completed hence this log :

> [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 initiated state
> change for partition [consup-43,78] from OnlinePartition to
> OnlinePartition failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [consup-43,78] due to: aborted leader election
> for partition [consup-43,78]
> since the LeaderAndIsr path was already written by another controller. This
> probably means that the current controller 1008 went through a soft failure
> and another controller was elec
> ted with epoch 14..

The question is,
- was there any offline partition?
- was there more than one active controller?

CMIIW


On Mon, May 30, 2016 at 2:41 PM, Fredo Lee  wrote:

> my server.log 
>
> lots of  error msg:
>
> [2016-05-28 12:12:31,132] WARN [ReplicaFetcherThread-0-1007], Replica 1008
> for partition [consup-03,16] reset its fetch offset from 13985537
> to current leader 1007's latest offset 13985537
> (kafka.server.ReplicaFetcherThread)
> [2016-05-28 12:12:31,132] ERROR [ReplicaFetcherThread-0-1007], Current
> offset 13987676 for partition [consup-03,16] out of range; reset
> offset to 13985537 (kafka.server.ReplicaFetcherThread)
>
>
> the other error msg:
>
> [2016-05-28 12:12:31,708] ERROR [Replica Manager on Broker 1008]: Error
> when processing fetch request for partition [consup-03,35] offset
> 13848954 from consumer with correlation id 0. Possible cause: Request for
> offset 13848954 but we only have log segments in the range 12356946 to
> 13847167. (kafka.server.ReplicaManager)
>
>
>
> 2016-05-30 15:37 GMT+08:00 Fredo Lee :
>
> > My state-change.log>>>
> >
> >
> > [2016-05-28 12:33:31,510] ERROR Controller 1008 epoch 13 aborted leader
> > election for partition [consup-43,78] since the LeaderAndIsr path
> > was already written by another controll
> > er. This probably means that the current controller 1008 went through a
> > soft failure and another controller was elected with epoch 14.
> > (state.change.logger)
> > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(1020,-1) for a request sent to broker
> id:1009,host:
> > 22.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(1066,-1) for a request sent to broker
> id:1001,host:
> > consup-kafka20.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(777,-1) for a request sent to broker id:1006,host:
> > consup-kafka11.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(742,-1) for a request sent to broker id:1018,host:
> > consup-kafka10.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(1021,-1) for a request sent to broker
> id:1009,host:
> > consup-kafka22.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(1067,-1) for a request sent to broker
> id:1001,host:
> > consup-kafka20.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(778,-1) for a request sent to broker id:1006,host:
> > consup-kafka11.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(1022,-1) for a request sent to broker
> id:1009,host:
> > consup-kafka22.com,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 encountered
> error
> > while electing leader for partition [consup-43,78] due to:
> aborted
> > leader election for partition [consup
> > -43,78] since the LeaderAndIsr path was already written by
> another
> > controller. This probably means that the current controller 1008 went
> > through a soft failure and another con
> > troller was elected with epoch 14.. (state.change.logger)
> > [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received
> response
> > UpdateMetadataResponse(743,-1) for a request sent to broker id:1018,host:
> > consup-kafka10.co
> > m,port:9092 (state.change.logger)
> > [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 initiated state
> > change for partition [consup-43,78] from OnlinePartition to
> > OnlinePartition failed (state.change.logger)
> > kafka.common.StateChangeFailedException: encountere

Re: Kafka encryption

2016-05-30 Thread eugene miretsky
If you are looking to do encryprion in the serializer, you could use this
as a refernce -
https://github.com/rogers/change-data-capture/tree/master/kafka

FS encryprion is easier, but if you cannot do that, this is a good
alternative. It also offloads the encryption work off the broker.
On May 26, 2016 4:48 AM, "Jens Rantil"  wrote:

> > How can we do file system encryption?
>
> Google "LUKS" and you should be able to find some reasources on disk-level
> encryption.
>
> Cheers,
> Jens
>
> On Wed, May 25, 2016 at 11:59 AM Tom Crayford 
> wrote:
>
> > If you're using EBS then it's a single flag to use encrypted drives at
> the
> > provision time of the volume. I don't know about the other storage
> options,
> > I'd recommend looking at the AWS documentation.
> >
> > Thanks
> >
> > Tom Crayford
> > Heroku Kafka
> >
> > On Wednesday, 25 May 2016, Snehalata Nagaje <
> > snehalata.nag...@harbingergroup.com> wrote:
> >
> > >
> > >
> > > Thanks,
> > >
> > > How can we do file system encryption?
> > >
> > > we are using aws environment.
> > >
> > > Thanks,
> > > Snehalata
> > >
> > > - Original Message -
> > > From: "Gerard Klijs" >
> > > To: "Users" >
> > > Sent: Tuesday, May 24, 2016 7:26:27 PM
> > > Subject: Re: Kafka encryption
> > >
> > > For both old and new consumers/producers you can make your own
> > > (de)serializer to do some encryption, maybe that could be an option?
> > >
> > > On Tue, May 24, 2016 at 2:40 PM Tom Crayford  > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > There's no encryption at rest. It's recommended to use filesystem
> > > > encryption, or encryption of each individual message before producing
> > it
> > > > for this.
> > > >
> > > > Only the new producer and consumers have SSL support.
> > > >
> > > > Thanks
> > > >
> > > > Tom Crayford
> > > > Heroku Kafka
> > > >
> > > > On Tue, May 24, 2016 at 11:33 AM, Snehalata Nagaje <
> > > > snehalata.nag...@harbingergroup.com > wrote:
> > > >
> > > > >
> > > > >
> > > > > Thanks for quick reply.
> > > > >
> > > > > Do you mean If I see messages in kafka, those will not be readable?
> > > > >
> > > > > And also, we are using new producer but old consumer , does old
> > > consumer
> > > > > have ssl support?
> > > > >
> > > > > As mentioned in document, its not there.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Snehalata
> > > > >
> > > > > - Original Message -
> > > > > From: "Mudit Kumar" >
> > > > > To: users@kafka.apache.org 
> > > > > Sent: Tuesday, May 24, 2016 3:53:26 PM
> > > > > Subject: Re: Kafka encryption
> > > > >
> > > > > Yes,it does that.What specifically you are looking for?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On 5/24/16, 3:52 PM, "Snehalata Nagaje" <
> > > > > snehalata.nag...@harbingergroup.com > wrote:
> > > > >
> > > > > >Hi All,
> > > > > >
> > > > > >
> > > > > >We have requirement of encryption in kafka.
> > > > > >
> > > > > >As per docs, we can configure kafka with ssl, for secured
> > > communication.
> > > > > >
> > > > > >But does kafka also stores data in encrypted format?
> > > > > >
> > > > > >
> > > > > >Thanks,
> > > > > >Snehalata
> > > > >
> > > >
> > >
> >
> --
>
> Jens Rantil
> Backend Developer @ Tink
>
> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
> For urgent matters you can reach me at +46-708-84 18 32.
>


Re: Avro deserialization

2016-05-30 Thread Rick Mangi
That was exactly the problem, I found the example here to be very helpful - 
https://github.com/confluentinc/examples/blob/master/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
 


IMHO it’s confusing having the same class names in different packages when most 
people probably rely on an IDE to manage their imports.

Thanks!

Rick


> On May 30, 2016, at 5:44 AM, Michael Noll  wrote:
> 
> Rick,
> 
> 
> Is your code really importing the correct ConsumerConfig objects?
> 
> It should be:
> 
>import kafka.consumer.ConsumerConfig;
> 
> If you are using your IDE's auto-import feature, you might however end up
> with the following import, which will give you the "ConsumerConfig is not a
> public class" compile error:
> 
>import org.apache.kafka.clients.consumer.ConsumerConfig;
> 
> Lastly, it looks as if you need to update the following line as well:
> 
>// Note that this constructs from props (j.u.Properties), not vProps
> (VerifiableProperties)
>ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(new
> ConsumerConfig(props));
> 
> Let us know if this solves your error.  The CP 3.0.0 docs might need a fix
> then (to change vProps to props).
> 
> Best,
> Michael
> 
> 
> 
> On Sun, May 29, 2016 at 2:49 PM, Rick Mangi  wrote:
> 
>> Hello all,
>> 
>> I’m trying to use the new schema registry to read avro encoded messages I
>> created with kafka connect as described here:
>> http://docs.confluent.io/3.0.0/schema-registry/docs/serializer-formatter.html
>> 
>> The example code is obviously not correct, but beyond the obvious, I can’t
>> seem to figure out how to register KafkaAvroDecoder with a consumer. The
>> example given
>> 
>> ConsumerConnector consumer =
>> kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(
>> vProps));
>> 
>> Is illegal, ConsumerConfig is a private class and can’t be instantiated.
>> It also seems that KafkaAvroDecoder does not implement Deserializer, and
>> thus can’t be used in the normal way deserializers are registered.
>> 
>> Has anyone gotten this stuff to work?
>> 
>> Thanks,
>> 
>> Rick
>> 
>> 
> 
> 
> --
> Best regards,
> Michael Noll
> 
> 
> 
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> Apache Kafka and Confluent Platform: www.confluent.io/download
> *



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: No more message scenario question

2016-05-30 Thread Tauzell, Dave
Also check out writing a custom partitioner. You could figure out the number of 
partitions and then use a custom partitioner to write a stop message to each. 

I would consider looking into not needing this functionality at all if 
possible. 

Dave 

> On May 29, 2016, at 15:18, Tauzell, Dave  wrote:
> 
> If you use keys it might be possible to write a "no more messages" message to 
> each partition.
> 
> Dave.
> 
>> On May 29, 2016, at 01:51, David Olsen  wrote:
>> 
>> I understand that the reception of Kafka messages may be out of
>> order[1][2]. But I have a multiple producers/ consumers scenario where I
>> would need to know when the last record is processed at the producer side,
>> so once consumers process e.g. NoMoreMessage, the entire system can react
>> such as stopping the consumers, proceeding to the next step, and so on. But
>> due to the nature of distributed system, there is no guaranteed that
>> NoMoreMessage issued at the producer side would be the last message
>> processed by consumers. So I need pieces of advice if any strategies I can
>> exploit in this scenario with Kafka?
>> 
>> I appreciate any suggestions.
>> 
>> [1].
>> https://www.quora.com/Is-it-possible-to-consume-kafka-message-offset-based-on-timestamp
>> [2].
>> http://stackoverflow.com/questions/35525356/kafka-multiple-producers-writing-to-same-topic-and-order-of-message-is-importa?rq=1
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.


Re: WARN Property [sasl-stuff] is not valid

2016-05-30 Thread Ismael Juma
Hi Stefano,

For the console consumer, you need to pass --new-consumer for it to work.

For the producer, it seems like you are using configs for Kafka 0.10 with a
0.9 producer, but the warning is benign. Are you sure the producer is not
working?

Ismael
On 30 May 2016 11:50, "Stefano Baghino" 
wrote:

Hello everybody,

I'm trying to run a simple Flink that both read from and writes to Kafka,
however at the moment I'm having a hard time at running both the console
consumer and producer to test the effectiveness of my test.

It is extremely important that I run these tests in a kerberized
environment and right now I'm using Kafka 0.9.0.1. In the past I've used
Hortonworks version without incurring in any problem but right now I can't
run the "vanilla" open source version.

I've followed the documentation to make Kafka work in a secure environment,
created a couple of topics (flink-kafka-source and flink-kafka-sink) and
assigned ACLs to them (all users have "All" permissions on the two topics I
created).

When I run

> $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper
zk1:2181,zk2:2181,zk3:2181 --topic flink-kafka-source \
--consumer.config /home/user/kafka-test/kafka.properties

I get this output:

[2016-05-30 12:26:51,094] WARN Property sasl.kerberos.service.name is not
valid (kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,094] WARN Property sasl.mechanism is not valid
(kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,094] WARN Property security.protocol is not valid
(kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,383] WARN
[console-consumer-77104_ip-172-31-18-184.eu-central-1.compute.internal-1464604011134-faf0daec],
no brokers found when trying to rebalance.
(kafka.consumer.ZookeeperConsumerConnector)
And the output doesn't seem to be consumed.

I can't seem to make the producer work either. When I run

$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic flink-kafka-source
--broker-list kafka-broker:6667 \
--producer.config /home/user/kafka-test/kafka.properties

I get this output

[2016-05-30 12:47:21,718] WARN The configuration sasl.mechanism = GSSAPI
was supplied but isn't a known config.
(org.apache.kafka.clients.producer.ProducerConfig)
And apparently stdin is not written on Kafka.

The kafka.properties file contain the following line, as provided by the
official documentation:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
Does anybody have a hint on what I'm doing wrong? Thanks in advance.

--
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Compatibility between different versions of Kafka server and kafka-clients library

2016-05-30 Thread Ismael Juma
Hi Mikael,

This is expected. Older clients work with newer brokers, but newer clients
don't work with older brokers.

Ismael
On 30 May 2016 17:29, "Mikael Ståldal"  wrote:

> I am experiencing compatibility issues between different versions of Kafka
> server and kafka-clients library.
>
> kafka-client 0.8.2.2 to Kafka server 0.9.0.1 works fine,
> but when using kafka-client 0.9.0.1 to Kafka server 0.8.2.2 I get timeout
> when waiting for the future returned by Producer.send(). The message is
> properly sent without delay though.
>
> Code is here:
>
> https://github.com/apache/logging-log4j2/blob/master/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java
>
> Problem:
> java.util.concurrent.TimeoutException: Timeout after waiting for 3 ms.
> at
>
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:50)
> at
>
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> at
>
> org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:81)
>
> --
> [image: MagineTV]
>
> *Mikael Ståldal*
> Senior software developer
>
> *Magine TV*
> mikael.stal...@magine.com
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>
> Privileged and/or Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not
> copy or deliver this message to anyone. In such case,
> you should destroy this message and kindly notify the sender by reply
> email.
>


Compatibility between different versions of Kafka server and kafka-clients library

2016-05-30 Thread Mikael Ståldal
I am experiencing compatibility issues between different versions of Kafka
server and kafka-clients library.

kafka-client 0.8.2.2 to Kafka server 0.9.0.1 works fine,
but when using kafka-client 0.9.0.1 to Kafka server 0.8.2.2 I get timeout
when waiting for the future returned by Producer.send(). The message is
properly sent without delay though.

Code is here:
https://github.com/apache/logging-log4j2/blob/master/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java

Problem:
java.util.concurrent.TimeoutException: Timeout after waiting for 3 ms.
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:50)
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at
org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager.send(KafkaManager.java:81)

-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Re: Kafka Streams / Processor

2016-05-30 Thread Matthias J. Sax
2) I am not sure if I understand correctly
   * punctuation is independent from committing (ie, you cannot use it
to flush)
   * if you need to align writes with commits you can either use a
KStream/KTable or need to register a state (see StateStore.java)

5) The application goes down -- neither process() nor punctuate() should
throw!


-Matthias

On 05/27/2016 03:52 AM, Tobias Adamson wrote:
> Thank you. 
> Some more follow-up questions
> 
> 1) great, will do some tests
> 
> 2) if auto commit is used how do we prevent a commit happening when an error 
> happens in processing. Basically our scenario is that we build up aggregation 
> contexts for specific keys (these are a bit special so most probably can't 
> use KTables) and we would then on each punctuate call want to save these 
> contexts to our external systems. Once saved we would commit our offset. 
> However if the progress is committed before punctuate and we have an error 
> saving we could end up with the offset being ahead of our saved progress. 
> 
> 3) great
> 
> 4) great
> 
> 5) What happens if a process/punctuate call throws a RuntimeException?
> 
> Regards
> Toby
> 
> 
> 
>> On 27 May 2016, at 1:32 AM, Matthias J. Sax  wrote:
>>
>> Hi Toby,
>>
>> 1) I am not an expert for RocksDB, but I don't see a problem with larger
>> objects.
>>
>> 2) I assume, by "guaranteed" you mean that the commit is performed when
>> the call return. In this case, no. It only sets a flag to commit at the
>> next earliest point in time possible. Ie, you can trigger commits in
>> between the regular commit interval that is configured via
>> "commit.interval.ms"
>>
>> 3) Yes.
>>
>> 4) Yes.
>>
>> -Matthias
>>
>>
>> On 05/26/2016 02:36 PM, Tobias Adamson wrote:
>>> Hi
>>> We have a scenario where we could benefit from the new API’s instead of our 
>>> in house ones.
>>> However we have a couple of questions
>>>
>>> 1. Is it feasible to save 2-3MB size values in the RocksDBStorage?
>>> 2. When is the offset committed as processed when using a custom Processor, 
>>> is it when you call commit on the context and is the commit guaranteed?
>>> 3. Is it ok to put values in to the KVStore in the punctuate method?
>>> 4. Is the punctuate method run by the same thread as the process method?
>>>
>>>
>>> Regards
>>> Toby
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Topics, partitions and keys

2016-05-30 Thread Igor Kravzov
Thank you Todd for great response.

On Sun, May 29, 2016 at 8:52 PM, Todd Palino  wrote:

> Answers are in-line below.
>
> -Todd
>
> On Sun, May 29, 2016 at 3:00 PM, Igor Kravzov 
> wrote:
>
> > Please help me with the subject.
> > In Kafka documentations I found the following:
> >
> > *Kafka only provides a total order over messages within a partition, not
> > between different partitions in a topic. Per-partition ordering combined
> > with the ability to partition data by key is sufficient for most
> > applications. However, if you require a total order over messages this
> can
> > be achieved with a topic that has only one partition, though this will
> mean
> > only one consumer process per consumer group.*
> >
> > So here are my questions:
> > 1. Does it mean if i want to have more than 1 consumer (from the same
> > group) reading from the same topic I need to have more than 1 partition?
> >
>
> Yes
>
>
> > 2. Does it mean that I need the same amount of partitions as amount of
> > consumers for the same group?
> >
>
> If you want all your consumers to be actively consuming, then you need at
> least as many partitions as you have consumers. You can have more
> partitions than you do consumers, and you can be assured that all
> partitions will be consumed. Note that you don’t have to have as many
> partitions as you do consumers - you could have some “warm spares” in place
> to pick up if others drop out.
>
>
> > 3. How many consumers can read from one partition?
> >
>
> Only one consumer in a given consumer group can read from a single
> partition. You can have as many consumer groups reading the same topic as
> your system is able to handle (i.e. how much network bandwidth do you have,
> for example).
>
>
> > Also have some questions regarding relationship between keys and
> partitions
> > with regard to API. I only looked at .net APIs (especially one from MS)
> >  but looks like the mimic Java API.
> > Whhen using a producer to send a message to a topic there is a key
> > parameter. But when consumer reads from a topic there is a partition
> > number.
> >
> > 1. How are partitions numbered? Starting from 0 or 1?
> >
>
> Starting from zero. A topic with eight partitions will have them numbered
> zero through seven.
>
>
> > 2. What exactly relationship between a key and partition?
> > As I understand some function on key will determine a partition. is that
> > correct?
> >
>
> When using keys, the default partitioner will hash the key and write it to
> a partition based on the hash. All messages produced with that same key
> will be written to the same partition. This is useful, for example, if you
> want to make sure all messages that deal with a particular user are in the
> same partition to optimize processing on your consumers.
>
> Note that the hashing and partition assignment will be consistent as long
> as the number of partitions remains the same. If you change the number of
> partitions for the topic, the assignments of keys to partitions will
> change.
>
>
> > 3. If I have 2 partitions in a topic and want some particular messages go
> > to one partition and other messages go to another I should use a specific
> > key for one specific partition, and the rest for another?
> >
>
> For 2 partitions, it is likely, but not guaranteed, that if you produce
> messages with 2 different keys they will end up in different partitions. If
> you truly want to assure partitioning like that, you will need to use a
> custom partitioner. This will allow you to specify, in the producer,
> exactly what partition each message gets produced to.
>
>
> > 4. What if I have 3 partitions and one type of messages to go to one
> > particular partition and the rest to other two?
> >
>
> Again, this is a case where you need to use a custom partitioner if you
> want to get fancy.
>
>
> > 5. How in general I send messages to a particular partition in order to
> > know  for a consumer from where to read?
> > Or I better off with multiple topics?
> >
>
> Either way is OK. I usually use the guideline that if the messages are of
> different types (e.g. one is a page view event, and one is a search event),
> they should probably be in different topics named appropriately. This
> allows your consumers to know exactly what they are dealing with, and you
> never know when you’ll have a consumer that will care about one and not the
> other. You want to minimize a consumer reading messages and throwing them
> away because they’re not what it wants to be reading.
>
> Doing something like having a topic per-user can be very problematic as the
> scale starts to increase. Yes, you can certainly use a wildcard consumer,
> but if you’re not doing that you have to maintain some mapping of consumers
> to topics. And if you are using a wildcard consumer, you’re going to run
> into issues with the number of topics any given group is consuming at some
> point. Your system may work fine for 5 topics, but what about when it grows
> to 100? 1000? A m

kafka.tools.ConsumerOffsetChecker fails for one topic

2016-05-30 Thread Diego Woitasen
Hi,
  I have these topic in Kafka:

# /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181
--list__consumer_offsets
facebook
twitter

# bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group jopgroup
--zookeeper localhost:2181 --topic twitter
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /consumers/jopgroup/offsets/twitter/0.

Same command for "facebook" topic works fine.

More info:

# /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe
--topic twitter
Topic:twitter   PartitionCount:1ReplicationFactor:1 Configs:
Topic: twitter  Partition: 0Leader: 1   Replicas: 1
Isr: 1

Thanks in advance!
-- 

Diego Woitasen
http://flugel.it
Infrastructure Developers


WARN Property [sasl-stuff] is not valid

2016-05-30 Thread Stefano Baghino
Hello everybody,

I'm trying to run a simple Flink that both read from and writes to Kafka,
however at the moment I'm having a hard time at running both the console
consumer and producer to test the effectiveness of my test.

It is extremely important that I run these tests in a kerberized
environment and right now I'm using Kafka 0.9.0.1. In the past I've used
Hortonworks version without incurring in any problem but right now I can't
run the "vanilla" open source version.

I've followed the documentation to make Kafka work in a secure environment,
created a couple of topics (flink-kafka-source and flink-kafka-sink) and
assigned ACLs to them (all users have "All" permissions on the two topics I
created).

When I run

> $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper
zk1:2181,zk2:2181,zk3:2181 --topic flink-kafka-source \
--consumer.config /home/user/kafka-test/kafka.properties

I get this output:

[2016-05-30 12:26:51,094] WARN Property sasl.kerberos.service.name is not
valid (kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,094] WARN Property sasl.mechanism is not valid
(kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,094] WARN Property security.protocol is not valid
(kafka.utils.VerifiableProperties)
[2016-05-30 12:26:51,383] WARN
[console-consumer-77104_ip-172-31-18-184.eu-central-1.compute.internal-1464604011134-faf0daec],
no brokers found when trying to rebalance.
(kafka.consumer.ZookeeperConsumerConnector)
And the output doesn't seem to be consumed.

I can't seem to make the producer work either. When I run

$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic flink-kafka-source
--broker-list kafka-broker:6667 \
--producer.config /home/user/kafka-test/kafka.properties

I get this output

[2016-05-30 12:47:21,718] WARN The configuration sasl.mechanism = GSSAPI
was supplied but isn't a known config.
(org.apache.kafka.clients.producer.ProducerConfig)
And apparently stdin is not written on Kafka.

The kafka.properties file contain the following line, as provided by the
official documentation:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
Does anybody have a hint on what I'm doing wrong? Thanks in advance.

-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Avro deserialization

2016-05-30 Thread Michael Noll
Rick,


Is your code really importing the correct ConsumerConfig objects?

It should be:

import kafka.consumer.ConsumerConfig;

If you are using your IDE's auto-import feature, you might however end up
with the following import, which will give you the "ConsumerConfig is not a
public class" compile error:

import org.apache.kafka.clients.consumer.ConsumerConfig;

Lastly, it looks as if you need to update the following line as well:

// Note that this constructs from props (j.u.Properties), not vProps
(VerifiableProperties)
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));

Let us know if this solves your error.  The CP 3.0.0 docs might need a fix
then (to change vProps to props).

Best,
Michael



On Sun, May 29, 2016 at 2:49 PM, Rick Mangi  wrote:

> Hello all,
>
> I’m trying to use the new schema registry to read avro encoded messages I
> created with kafka connect as described here:
> http://docs.confluent.io/3.0.0/schema-registry/docs/serializer-formatter.html
>
> The example code is obviously not correct, but beyond the obvious, I can’t
> seem to figure out how to register KafkaAvroDecoder with a consumer. The
> example given
>
> ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(
> vProps));
>
> Is illegal, ConsumerConfig is a private class and can’t be instantiated.
> It also seems that KafkaAvroDecoder does not implement Deserializer, and
> thus can’t be used in the normal way deserializers are registered.
>
> Has anyone gotten this stuff to work?
>
> Thanks,
>
> Rick
>
>


-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Question about applicability of Kafka Streams

2016-05-30 Thread Michael Noll
Kim,

yes, Kafka Stream is very suitable for this kind of application.

The code example that Tobias linked to should be a good starting point for
you (thanks, Tobias!).

Best,
Michael



On Fri, May 27, 2016 at 4:35 AM, BYEONG-GI KIM  wrote:

> Thank you very much for the information!
> I'll look into it.
>
> Best regards
>
> KIM
>
> 2016-05-27 11:31 GMT+09:00 Tobias Adamson :
>
> > Hi Kim
> > Would maybe this example work for you?
> >
> >
> https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > <
> >
> https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > >
> >
> > It included JSON -> POJO -> JSON steps and could probably be adapted for
> > your case?
> >
> > Regards
> > Toby
> > > On 27 May 2016, at 10:20 AM, BYEONG-GI KIM  wrote:
> > >
> > > Hello.
> > >
> > > First I thank you so much for the devs since they've been making a
> great
> > > tool as open-source software.
> > >
> > > I'm considering to apply a new feature of the Kafka, aka Kafka Streams,
> > on
> > > my simple handler application, which receives monitoring data from
> > Collectd
> > > and reproduce transformed messages to Kafka Broker(s). For example, I'd
> > > want to change the collected message from Collectd like,
> > >
> > >
> >
> [{"values":[1901474177],"dstypes":["counter"],"dsnames":["value"],"time":1280959128,"interval":10,"host":"
> > > leeloo.octo.it
> > >
> >
> ","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"idle"}]
> > >
> > > to my customized alarm message like,
> > >
> > > {"alarmMsgType":"threshold", "time":145943640, "host":"
> > leeloo.octo.it
> > > ","category":"CPU","type":"IDLE",
> > > "detail":"0","alarmLevel":"critical","message":"cpu
> > > error","value":"1901474177"}
> > >
> > > of course, the re-produced message must be sent to Kafka Broker(s).
> > >
> > > The problem is that, the message(s) from Collectd is Json-formatted so
> > that
> > > it seems the Kafka Streams processing would become complicated, i.e.,
> it
> > > should be JSONParsed from String to JSON and vise versa after
> transform.
> > >
> > > Is it suitable to use the Kafka Stream for this kind of application?
> > >
> > > Any better idea or comments would also really helpful for me. Thanks in
> > > advance!
> > >
> > > Best regards
> > >
> > > KIM
> >
> >
>


Re: soft failure for kakfa 0.8.2.2

2016-05-30 Thread Fredo Lee
my server.log 

lots of  error msg:

[2016-05-28 12:12:31,132] WARN [ReplicaFetcherThread-0-1007], Replica 1008
for partition [consup-03,16] reset its fetch offset from 13985537
to current leader 1007's latest offset 13985537
(kafka.server.ReplicaFetcherThread)
[2016-05-28 12:12:31,132] ERROR [ReplicaFetcherThread-0-1007], Current
offset 13987676 for partition [consup-03,16] out of range; reset
offset to 13985537 (kafka.server.ReplicaFetcherThread)


the other error msg:

[2016-05-28 12:12:31,708] ERROR [Replica Manager on Broker 1008]: Error
when processing fetch request for partition [consup-03,35] offset
13848954 from consumer with correlation id 0. Possible cause: Request for
offset 13848954 but we only have log segments in the range 12356946 to
13847167. (kafka.server.ReplicaManager)



2016-05-30 15:37 GMT+08:00 Fredo Lee :

> My state-change.log>>>
>
>
> [2016-05-28 12:33:31,510] ERROR Controller 1008 epoch 13 aborted leader
> election for partition [consup-43,78] since the LeaderAndIsr path
> was already written by another controll
> er. This probably means that the current controller 1008 went through a
> soft failure and another controller was elected with epoch 14.
> (state.change.logger)
> [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(1020,-1) for a request sent to broker id:1009,host:
> 22.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(1066,-1) for a request sent to broker id:1001,host:
> consup-kafka20.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(777,-1) for a request sent to broker id:1006,host:
> consup-kafka11.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(742,-1) for a request sent to broker id:1018,host:
> consup-kafka10.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(1021,-1) for a request sent to broker id:1009,host:
> consup-kafka22.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(1067,-1) for a request sent to broker id:1001,host:
> consup-kafka20.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(778,-1) for a request sent to broker id:1006,host:
> consup-kafka11.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(1022,-1) for a request sent to broker id:1009,host:
> consup-kafka22.com,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 encountered error
> while electing leader for partition [consup-43,78] due to: aborted
> leader election for partition [consup
> -43,78] since the LeaderAndIsr path was already written by another
> controller. This probably means that the current controller 1008 went
> through a soft failure and another con
> troller was elected with epoch 14.. (state.change.logger)
> [2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
> UpdateMetadataResponse(743,-1) for a request sent to broker id:1018,host:
> consup-kafka10.co
> m,port:9092 (state.change.logger)
> [2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 initiated state
> change for partition [consup-43,78] from OnlinePartition to
> OnlinePartition failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [consup-43,78] due to: aborted leader election
> for partition [consup-43,78]
> since the LeaderAndIsr path was already written by another controller.
> This probably means that the current controller 1008 went through a soft
> failure and another controller was elec
> ted with epoch 14..
> at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:380)
> at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:208)
> at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:146)
> at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:145)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:145)
> at
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:631)
> at
> kafka.controller.KafkaController$$ano

Re: soft failure for kakfa 0.8.2.2

2016-05-30 Thread Fredo Lee
My state-change.log>>>


[2016-05-28 12:33:31,510] ERROR Controller 1008 epoch 13 aborted leader
election for partition [consup-43,78] since the LeaderAndIsr path
was already written by another controll
er. This probably means that the current controller 1008 went through a
soft failure and another controller was elected with epoch 14.
(state.change.logger)
[2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(1020,-1) for a request sent to broker id:1009,host:
22.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(1066,-1) for a request sent to broker id:1001,host:
consup-kafka20.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(777,-1) for a request sent to broker id:1006,host:
consup-kafka11.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(742,-1) for a request sent to broker id:1018,host:
consup-kafka10.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,510] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(1021,-1) for a request sent to broker id:1009,host:
consup-kafka22.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(1067,-1) for a request sent to broker id:1001,host:
consup-kafka20.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(778,-1) for a request sent to broker id:1006,host:
consup-kafka11.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(1022,-1) for a request sent to broker id:1009,host:
consup-kafka22.com,port:9092 (state.change.logger)
[2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 encountered error
while electing leader for partition [consup-43,78] due to: aborted
leader election for partition [consup
-43,78] since the LeaderAndIsr path was already written by another
controller. This probably means that the current controller 1008 went
through a soft failure and another con
troller was elected with epoch 14.. (state.change.logger)
[2016-05-28 12:33:31,511] TRACE Controller 1008 epoch 13 received response
UpdateMetadataResponse(743,-1) for a request sent to broker id:1018,host:
consup-kafka10.co
m,port:9092 (state.change.logger)
[2016-05-28 12:33:31,511] ERROR Controller 1008 epoch 13 initiated state
change for partition [consup-43,78] from OnlinePartition to
OnlinePartition failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [consup-43,78] due to: aborted leader election
for partition [consup-43,78]
since the LeaderAndIsr path was already written by another controller. This
probably means that the current controller 1008 went through a soft failure
and another controller was elec
ted with epoch 14..
at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:380)
at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:208)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:146)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:145)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:145)
at
kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:631)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaControl
ler.scala:1158)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.sca
la:1153)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.sca
la:1153)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1150)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1148)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.

Re: offset out of range for kafka follower when leader changed.

2016-05-30 Thread Fredo Lee
some of my consumer always get out of range exception, and i can find that
the new leader who is follower before trunked its log file.

2016-05-30 15:05 GMT+08:00 Fredo Lee :

> my kafka cluster has twenty kafka brokers. and my producer sets
> request.required.acks = -1, my brokers set min.insync.replicas=2, and
> unclear leader is enable.
>
> i think under this configuration, i think when one leader of the topic is
> changed, there should are no any log trunked partition for the follower who
> becoming the new  leader.
>


offset out of range for kafka follower when leader changed.

2016-05-30 Thread Fredo Lee
my kafka cluster has twenty kafka brokers. and my producer sets
request.required.acks = -1, my brokers set min.insync.replicas=2, and
unclear leader is enable.

i think under this configuration, i think when one leader of the topic is
changed, there should are no any log trunked partition for the follower who
becoming the new  leader.