Re: How to work around log compaction error (0.8.2.2)

2016-04-27 Thread Manikumar Reddy
Hi,

 Are you enabling log compaction on a topic with compressed messages?
 If yes, then that might be the reason for the exception.  0.8.2.2 Log
Compaction does
 not support compressed  messages. This got fixed in 0.9.0.0 (KAFKA-1641,
KAFKA-1374)

Check below mail thread for some corrective actions
http://grokbase.com/t/kafka/users/159jbe18en/log-cleaner-thread-stops


On Thu, Apr 28, 2016 at 1:44 AM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Hello,
>
> We enabled log compaction on a few topics, as we want to preserve
> permanently the latest versions of messages published to specific topics.
> After enabling compaction, the log cleaner thread dies with the same error
> for the topics we tried it on.  It looks like kafka has starting offset
> that does not exist in the topic (at least that is how I am reading the
> error).  Any ideas on how we can work around this error?
>
> Thanks
> Rakesh
>
> [2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log
> metamorphosis.lineup-0. (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for
> metamorphosis.lineup-0... (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log
> metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466).
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Last clean offset
> is 1553258 but segment base offset is 2125968 for log
> metamorphosis.lineup-0.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
> at kafka.log.Cleaner.clean(LogCleaner.scala:307)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
>
>


leader election bug

2016-04-27 Thread Kane Kim
Hello,

Looks like we are hitting leader election bug. I've stopped one broker
(104224873) on other brokers I see following:

WARN  kafka.controller.ControllerChannelManager  - [Channel manager on
controller 104224863]: Not sending request Name: StopReplicaRequest;
Version: 0; CorrelationId: 843100; ClientId: ; DeletePartitions: false;
ControllerId: 104224863; ControllerEpoch: 8; Partitions: [mp-auth,169] to
broker 104224873, since it is offline.

Also describing topics returns this:
Topic: mp-unknown Partition: 597 Leader: 104224873 Replicas:
104224874,104224873,104224875 Isr: 104224873,104224875

broker 104224873 is shut down, but it's still leader for the partition (at
least for a couple of hours as I monitor it).
Zookeeper cluster is healthy.

ls /brokers/ids
[104224874, 104224875, 104224863, 104224864, 104224871, 104224867,
104224868, 104224865, 104224866, 104224876, 104224877, 104224869,
104224878, 104224879]

That broker is not registered in ZK.


Re: auto leader rebalancing

2016-04-27 Thread Kane Kim
No, I don't see that in controller.log. Looks like it's related to this
bug: https://issues.apache.org/jira/browse/KAFKA-2729, after I did rolling
restart (all brokers) it rebalanced. It wasn't rebalancing though when a
couple of brokers were missing from ISR (but there were enough brokers to
rebalance anyway).


On Wed, Apr 27, 2016 at 4:01 PM, Robert Christ  wrote:

> I believe the 10% is measured on a broker level not a topic
> level.  Do you see lines like:
>
> [2016-04-27 22:52:47,854] TRACE [Controller 3]: leader imbalance ratio for
> broker 5 is 0.978555 (kafka.controller.KafkaController)
> [2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for
> broker 1 is 0.00 (kafka.controller.KafkaController)
> [2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for
> broker 2 is 0.00 (kafka.controller.KafkaController)
> [2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for
> broker 3 is 0.00 (kafka.controller.KafkaController)
> [2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for
> broker 4 is 0.00 (kafka.controller.KafkaController)
>
> in your controller log?
>   rob
>
> > On Apr 27, 2016, at 3:46 PM, Kane Kim  wrote:
> >
> > Bump
> >
> > On Tue, Apr 26, 2016 at 10:33 AM, Kane Kim 
> wrote:
> >
> >> Hello,
> >>
> >> We have auto.leader.rebalance.enable = True, other options are by
> default
> >> (10% imbalance ratio and 300 seconds).
> >>
> >> We have a check that reports leadership imbalance:
> >>
> >> critical: Leadership out of balance for topic mp-auth. Leader counts: {
> >> "104224873"=>84, "104224876"=>22, "104224877"=>55, "104224863"=>110,
> >> "104224865"=>107, "104224867"=>105, "104224868"=>42, "104224871"=>9,
> >> "104224879"=>20, "104224866"=>12, "104224869"=>17, "104224864"=>14,
> >> "104224878"=>3}
> >>
> >> Above is the brokerid: number of partitions, apparently the imbalance is
> >> higher than 10% and it was in this state for a couple of hours, there
> is no
> >> evidence that it's trying to rebalance.
> >>
> >> Is there anything else we have to configure/check?
> >>
> >> Thanks.
> >>
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: auto leader rebalancing

2016-04-27 Thread Robert Christ
I believe the 10% is measured on a broker level not a topic
level.  Do you see lines like:

[2016-04-27 22:52:47,854] TRACE [Controller 3]: leader imbalance ratio for 
broker 5 is 0.978555 (kafka.controller.KafkaController)
[2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for 
broker 1 is 0.00 (kafka.controller.KafkaController)
[2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for 
broker 2 is 0.00 (kafka.controller.KafkaController)
[2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for 
broker 3 is 0.00 (kafka.controller.KafkaController)
[2016-04-27 22:52:47,855] TRACE [Controller 3]: leader imbalance ratio for 
broker 4 is 0.00 (kafka.controller.KafkaController)

in your controller log?
  rob

> On Apr 27, 2016, at 3:46 PM, Kane Kim  wrote:
>
> Bump
>
> On Tue, Apr 26, 2016 at 10:33 AM, Kane Kim  wrote:
>
>> Hello,
>>
>> We have auto.leader.rebalance.enable = True, other options are by default
>> (10% imbalance ratio and 300 seconds).
>>
>> We have a check that reports leadership imbalance:
>>
>> critical: Leadership out of balance for topic mp-auth. Leader counts: {
>> "104224873"=>84, "104224876"=>22, "104224877"=>55, "104224863"=>110,
>> "104224865"=>107, "104224867"=>105, "104224868"=>42, "104224871"=>9,
>> "104224879"=>20, "104224866"=>12, "104224869"=>17, "104224864"=>14,
>> "104224878"=>3}
>>
>> Above is the brokerid: number of partitions, apparently the imbalance is
>> higher than 10% and it was in this state for a couple of hours, there is no
>> evidence that it's trying to rebalance.
>>
>> Is there anything else we have to configure/check?
>>
>> Thanks.
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: auto leader rebalancing

2016-04-27 Thread Kane Kim
Bump

On Tue, Apr 26, 2016 at 10:33 AM, Kane Kim  wrote:

> Hello,
>
> We have auto.leader.rebalance.enable = True, other options are by default
> (10% imbalance ratio and 300 seconds).
>
> We have a check that reports leadership imbalance:
>
> critical: Leadership out of balance for topic mp-auth. Leader counts: {
> "104224873"=>84, "104224876"=>22, "104224877"=>55, "104224863"=>110,
> "104224865"=>107, "104224867"=>105, "104224868"=>42, "104224871"=>9,
> "104224879"=>20, "104224866"=>12, "104224869"=>17, "104224864"=>14,
> "104224878"=>3}
>
> Above is the brokerid: number of partitions, apparently the imbalance is
> higher than 10% and it was in this state for a couple of hours, there is no
> evidence that it's trying to rebalance.
>
> Is there anything else we have to configure/check?
>
> Thanks.
>


Re: topic=LEADER_NOT_AVAILABLE

2016-04-27 Thread Oliver Pačut
I'm afraid it does. And I am not an owner of the cloud. It seems like a
zookeeper error, maybe? Does it need special configuration in such an
environment? I'll figure it out eventually but any help would be greatly
appreciated.
On Apr 28, 2016 12:26 AM, "Oliver Pačut"  wrote:

> I'm afraid it does. And I am not an owner of the cloud. It seems like a
> zookeeper error, maybe? Does it need special configuration in such an
> environment? I'll figure it out eventually but any help would be greatly
> appreciated.
>


Connection management from Kafka Producers and Consumers

2016-04-27 Thread Durga Deep Tirunagari
Folks,

Is there a writeup / pointers on how the connection management /
Retries / Failure handling etc.. between Kafka Producers and Kafka
Consumers to a Kafka Cluster.


 Thanks...
_dd_


How to work around log compaction error (0.8.2.2)

2016-04-27 Thread Rakesh Vidyadharan
Hello,

We enabled log compaction on a few topics, as we want to preserve permanently 
the latest versions of messages published to specific topics.  After enabling 
compaction, the log cleaner thread dies with the same error for the topics we 
tried it on.  It looks like kafka has starting offset that does not exist in 
the topic (at least that is how I am reading the error).  Any ideas on how we 
can work around this error?

Thanks
Rakesh

[2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting  
(kafka.log.LogCleaner)
[2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log 
metamorphosis.lineup-0. (kafka.log.LogCleaner)
[2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for 
metamorphosis.lineup-0... (kafka.log.LogCleaner)
[2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log 
metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466). 
(kafka.log.LogCleaner)
[2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due to  
(kafka.log.LogCleaner)
java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
1553258 but segment base offset is 2125968 for log metamorphosis.lineup-0.
at scala.Predef$.require(Predef.scala:233)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
at kafka.log.Cleaner.clean(LogCleaner.scala:307)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped  
(kafka.log.LogCleaner)



Re: Metadata Request Loop?

2016-04-27 Thread Fumo, Vincent
client/producer : 0.9.0.1
server/broker : 0.9.0.0

> On Apr 26, 2016, at 10:05 PM, Sharma Vinay  wrote:
> 
> What versions of kafka and client API are you using now?
> On Apr 26, 2016 3:35 PM, "Fumo, Vincent" 
> wrote:
> 
>> I spoke too soon. It's back doing the same thing.. which is really odd.
>> 
>>> On Apr 26, 2016, at 12:24 PM, Fumo, Vincent <
>> vincent_f...@cable.comcast.com> wrote:
>>> 
>>> Hello. I found the issue. The Ops team deployed kafka 0.8.1 and all my
>> code was 0.9.0. Simple mistake and one that I should have thought of
>> sooner. Once I had them bump up to the latest kafka all was well. Thank you
>> for your help!
>>> 
>>> v
>>> 
>>> 
 On Apr 25, 2016, at 2:54 PM, vinay sharma 
>> wrote:
 
 Hi,
 
 Your code looks good. i don't see any reason there for frequent meta
>> data
 fetch but i will run it to verify.
 
 I was able to replicate this issue with my consumer today where I
>> killed 2
 brokers out or 3 and ran consumer. I saw a lot of meta data requests in
 wait for new leader for partitions for which those 2 brokers were
>> leader. i
 see below 2 lines frequently as you mentioned in logs
 
 [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Metadata:172 - Updated
>> cluster
 metadata version 275 to Cluster(nodes = [Node(1, node1.example.com,
>> 9093)],
 partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader =
>> 1,
 replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition
>> =
 2, leader = none, replicas = [], isr = [], Partition(topic =
>> kafkaPOCTopic,
 partition = 0, leader = none, replicas = [], isr = []])
 
 [Test-KafkaPoller-1] 04/25/16 13:12:28 DEBUG Fetcher:453 - Leader for
 partition kafkaPOCTopic-0 unavailable for fetching offset, wait for
 metadata refresh
 
 I also saw this behavior due to below error where i might have killed
>> the
 only ISR available for the topic at that time.
 
 "Error while fetching metadata with correlation id 242 :
 {kafkaPOCTopic=LEADER_NOT_AVAILABLE}"
 
 Do you see any such error in your logs?
 
 Regards,
 Vinay Sharma
 
 
 On Mon, Apr 25, 2016 at 9:38 AM, Fumo, Vincent <
 vincent_f...@cable.comcast.com> wrote:
 
> 
> 
> My code is very straightforward. I create a producer, and then call it
>> to
> send messages. Here is the factory method::
> 
> public Producer createProducer() {
> 
>  Properties props = new Properties();
> 
>  props.put("bootstrap.servers", "cmp-arch-kafka-01d.cc.com:9092");
>  props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("acks", "all");
>  props.put("retries", "0");
>  props.put("batch.size", "1638");
>  props.put("linger.ms", "1");
>  props.put("buffer.memory", "33554432");
>  props.put("compression.type", "gzip");
>  props.put("client.id", "sds-merdevl");
> 
>  return new KafkaProducer<>(props);
> }
> 
> That is injected into a single instance of KafkaNotifier::
> 
> public class KafkaNotifier implements MessageNotifier {
>  private static Logger logger =
> LoggerFactory.getLogger(KafkaNotifier.class);
> 
>  private Producer producer;
>  private String topicName;
> 
>  @Override
>  public void sendNotificationMessage(DataObjectModificationMessage
> message) {
> 
>  String json = message.asJSON();
>  String key = message.getObject().getId().toString();
> 
>  producer.send(new ProducerRecord<>(topicName, key, json));
> 
>  if (logger.isDebugEnabled()) {
>  logger.debug("Sent Kafka message for object with id={}",
>> key);
>  }
>  }
> 
>  @Required
>  public void setProducer(Producer producer) {
>  this.producer = producer;
>  }
> 
>  @Required
>  public void setTopicName(String topicName) {
>  this.topicName = topicName;
>  }
> }
> 
> Here is the topic config info :
> 
> ./kafka-topics.sh -zookeeper mer-arch-zk-01d.cc.com:2181 --describe
> --topic s.notifications.dev
> Topic:s.notifications.dev   PartitionCount:1
> ReplicationFactor:1 Configs:retention.ms
> =17280,cleanup.policy=compact
>  Topic: s.notifications.dev  Partition: 0Leader: 0
> Replicas: 0 Isr: 0
> 
> 
> 
> 
> 
>> On Apr 22, 2016, at 6:27 PM, vinay sharma 
> wrote:
>> 
>> 2 producer's to same topic should not be a problem. There can be
>> multiple
>> producers and consumers of same kafka topic.
>> 
>> I am not sure what can be wrong here. I can this at my end If you can
> share
>> producer code and any config of topic ot broker that you changed and
>> is
>>>

Hash partition of key with skew

2016-04-27 Thread Srikanth
Hello,

Is there a recommendation for handling producer side partitioning based on
a key with skew?
We want to partition on something like clientId. Problem is, this key has
an uniform distribution.
Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
65million/day.
Cardinality of key is around 1500 and there are approx 1 billion records
per day.
Partitioning by hashcode(key)%numOfPartition will create a few "hot
partitions" and cause a few brokers(and consumer threads) to be overloaded.
May be these partitions with heavy load are evenly distributed among
brokers, may be they are not.

I read KIP-22

that
explains how one could write a custom partitioner.
I'd like to know how it was used to solve such data skew.
We can compute some statistics on key distribution offline and use it in
the partitioner.
Is that a good idea? Or is it way too much logic for a partitioner?
Anything else to consider?
Any thoughts or reference will be helpful.

Thanks,
Srikanth


Invalid TimeStamp Error while running WordCountDemo - kafka-0.10.0

2016-04-27 Thread Ramanan, Buvana (Nokia - US)
Hello,

I am trying to execute WordCountDemo app. I produced text to the input topic. 
But when I execute the WordCountDemo, I get error.

please help resolve the following:
ERROR Streams application error during processing in thread [StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.IllegalArgumentException: Invalid timestamp -1

broker, consumer & zk are running in the same machine. Ubuntu 14.04, java 1.8.

Thanks,
Buvana

~/kafka-0.10.0/bin$ ./kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountDemo
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/core/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/tools/build/dependant-libs-2.10.6/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/connect/api/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/connect/file/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/home/buvana/kafka-0.10.0/connect/json/build/dependant-libs/slf4j-log4j12-1.7.18.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2016-04-27 14:56:04,967] WARN The configuration replication.factor = 1 was 
supplied but isn't a known config. 
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-04-27 14:56:04,968] WARN The configuration num.standby.replicas = 0 was 
supplied but isn't a known config. 
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-04-27 14:56:04,968] WARN The configuration zookeeper.connect = 
localhost:2181 was supplied but isn't a known config. 
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-04-27 14:56:04,968] WARN The configuration __stream.thread.instance__ = 
Thread[StreamThread-1,5,main] was supplied but isn't a known config. 
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-04-27 14:56:05,736] ERROR Streams application error during processing in 
thread [StreamThread-1]:  
(org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.IllegalArgumentException: Invalid timestamp -1
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60)
at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:60)
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
at 
org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:89)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
at 
org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at 
org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:331)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:56)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at 
org.apache.kafka.streams.processor.intern

Debugging high log flush latency

2016-04-27 Thread Rajiv Kurian
We monitor the log flush latency p95 on all our Kafka nodes and
occasionally we see it creep up from the regular figure of under 15 ms to
above 150 ms.

Restarting the node usually doesn't help. It seems to fix itself over time
but we are not quite sure about the underlying reason. It's bytes-in/second
and messages-in/second are in line with the other brokers in the cluster.
When one of these incidents happen it usually lasts for hours.

Thanks,
Rajiv


Scaling up partitions with kafka streams and other stateful stream processing frameworks

2016-04-27 Thread Ryan Thompson
Hello,

I'm wondering if fault tolerant state management with kafka streams works
seamlessly if partitions are scaled up.  My understanding is that this is
indeed a problem that stateful stream processing frameworks need to solve,
and that:

with samza, this is not a solved problem (though I also understand it's
being worked on, based on a conversation I had yesterday at the kafka
summit with someone who works on samza)

with flink, there's a plan to solve this:  "The way we plan to implement
this in Flink is by shutting the dataflow down with a checkpoint, and
bringing the dataflow back up with a different parallelism."
http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/

with kafka streams, I haven't been able to find a solid answer on whether
or not this problem is solved for users, or if we need to handle it
ourselves.

Thanks,
Ryan


Re: What communication is happening between Kafka brokers?

2016-04-27 Thread Todd S
There will also be inter-broker replication traffic, and controller
communications (the controller runs on an elected broker in the
cluster).  If you're using security features in Kafka 0.9, you may see
additional auth traffic between brokers.

That's all I can think of off the top of my head.



On Wed, Apr 27, 2016 at 9:46 AM, Christoph Grotz
 wrote:
> Hi,
>
> we are thinking about setting up a Kafka cluster. I understand that the
> Kafka nodes need to communicate with the Zookeeper cluster in port 2181. Is
> there communication between the broker nodes?
>
> Thanks,
> Christoph


Re: Huge number of threads for supporting huge Kafka consumers

2016-04-27 Thread Cees de Groot
Hi Paolo,

Dig a bit through the mailing list archives - IIRC there's a "trick"
that lets you do long processing. Basically you pull in a big batch,
unsubscribe from all topics, do regular polls (that will just send the
heartbeat because you don't have active subscriptions) and then when
done, re-subscribe, poll. With CGs, you should be able to have lots of
consumers doing this in a group and get what you want without doing
manual thread pooling, etc.

I do hope that manual heartbeating will be available at some time - it
seems to be a recurring problem. Of course, the consumer library
(where all the magic happens) is Open Source so you could dig in and
hack your way around it :-)

On Fri, Apr 22, 2016 at 12:35 PM, Paolo Patierno  wrote:
> Hi all,
>
> I'm developing an AMQP - Kafka bridge and I'm facing with a great limitation 
> of current 0.9.1 Kafka client APIs.
>
> The consumer.poll() method is synchronous and as we know it's needed in order 
> to send the heartbeat even if no records are available. It means that poll() 
> needs to be called frequently before session ending.
> For that reason a thread per consumer is suggested because if we use a thread 
> pool (let me say with 20 threads) but 1000 consumer ... it's possible that 
> one consumer will be served too late without sending the heartbeat.
>
> I'm saying that because the bridging between AMQP and Kafka works at AMQP 
> link level so for each attached link to a specific topic there is the 
> corresponding Kafka consumer and the client could be thousands.
>
> Any thoughts about that ?
>
> Paolo.
>
>
>
> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience



-- 
Cees de Groot
Principal Software Engineer
PagerDuty, Inc.


What communication is happening between Kafka brokers?

2016-04-27 Thread Christoph Grotz
Hi,

we are thinking about setting up a Kafka cluster. I understand that the
Kafka nodes need to communicate with the Zookeeper cluster in port 2181. Is
there communication between the broker nodes?

Thanks,
Christoph


RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

2016-04-27 Thread vinay sharma
Hi Phil,

This sounds great. Thanks for trying these serrings. This means probably
something wrong in my code or setup. I will check what is causing this
issue in my case.

I have a 3 broker 1 zk cluster and my topic has 3 partitions with
replication factor 3.

Regards,
Vinay Sharma


Consumer group storage

2016-04-27 Thread Spico Florin
Hello!
  I'm using Kafka 0.9.0 high level consumer API. I have the following
questions:
1. Where is kept the consumer group and its corresponding clients?
2. Is the group kept in Zookeeper?
3. If the group is kept in ZK the data structure remains the same as
described here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
?

I look forward for your answers.
  Regards,
 Florin


RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

2016-04-27 Thread Phil Luckhurst
Hi Vinay,

I tried this out as you suggested by setting metadata.max.age.ms = 4 
(session.timeout.ms=3)
I then ran my consumer with a batch of 25 messages where each message takes 4 
seconds to process and I call commitSync(offsets) after each message to ensure 
the heartbeat keeps the consumer alive.

For the messages before metadata.max.age.ms expired I see as expected that the 
commitSync also logs the heartbeat.

2016-04-27 09:45:11,267 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received 
successful heartbeat response.
2016-04-27 09:45:11,270 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed 
offset 184 for partition phil-pa-1-device-update-1

Then the automatic metadata refresh is logged.

2016-04-27 09:45:15,276 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.NetworkClient: Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_
version=0,correlation_id=29,client_id=consumer-3}, 
body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, 
createdTimeMs=1461746715276, sendTimeMs=0) to node 0
2016-04-27 09:45:15,277 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.Metadata: Updated cluster metadata version 3 to 
Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = 
phil-pa-1-device-update, partition = 0, leader = 0, 
replicas = [0,], isr = [0,], Partition(topic = phil-pa-1-device-update, 
partition = 1, leader = 0, replicas = [0,], isr = [0,]])

The very next commitSync call succeeds but as you said does not peform the 
heartbeat - we don't get the log message"Received successful heartbeat 
response". But we only miss the heartbeat on that one message, on the 
commitSync calls that follow that we get the heartbeat back again.

2016-04-27 09:45:23,301 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received 
successful heartbeat response.
2016-04-27 09:45:23,309 DEBUG [pool-3-thread-2] 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed 
offset 187 for partition phil-pa-1-device-update-1

After another 40 seconds we see the next metadata refresh request and the same 
thing happens again. The first commmitSync after the metadata request does not 
perform the heartbeat but the ones that follow that do. This means in our case 
we call commitSync often enough that the metadata request does not cause us an 
issue.

Thanks,
Phil Luckhurst


-Original Message-
From: vinay sharma [mailto:vinsharma.t...@gmail.com] 
Sent: 26 April 2016 17:29
To: users@kafka.apache.org
Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

Config ConsumerConfig.METADATA_MAX_AGE_CONFIG has default 30 ms. This 
config drives a mechanism where a proactive meta data refresh request is issued 
by consumer periodically. i have seen that i get log about successful heartbeat 
along with commit only before this request. once this request is issued then 
next successful heartbeat is only after next poll.
This will cause a rebalance and mark current consumer dead if there is no poll 
in next 30 seconds after meta refresh (where session time is 30
seconds.)

Regards,
Vinay


Re: Out of memory - Java Heap space

2016-04-27 Thread Spico Florin
HI!
  You can set up your kafka process to dump the stack trace in case of the
OOM by providing the flags:(
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/clopts001.html
)

   -

   xx:HeapDumpPath=path

   This option can be used to specify a location for the heap dump, see The
   -XX:HeapDumpOnOutOfMemoryError Option
   

   .
   -

   -XX:MaxPermSize=size

   This option can be used to specify the size of the permanent generation
   memory, see Exception in thread thread_name: java.lang.OutOfMemoryError:
   GC Overhead limit exceeded
   

   .


 What Java version are you using?
I suggest to work with jdk 8 and the G1 garbage collector. I've heard some
Kafka engineers that promoted this advice.

I hope that these help.
  Regards,
  Florin



On Wed, Apr 27, 2016 at 12:02 PM, Jaikiran Pai 
wrote:

> Have you tried getting the memory usage output using tool like jmap and
> seeing what's consuming the memory? Also, what are you heap sizes for the
> process?
>
> -Jaikiran
>
>
> On Tuesday 19 April 2016 02:31 AM, McKoy, Nick wrote:
>
>> To follow up with my last email, I have been looking into
>> socket.receive.buffer.byte as well as socket.send.buffer.bytes. Would it
>> help to increase the buffer for OOM issue?
>>
>> All help is appreciated!
>>
>> Thanks!
>>
>>
>> -nick
>>
>>
>> From: "McKoy, Nick" > nicholas.mc...@washpost.com>>
>> Date: Monday, April 18, 2016 at 3:41 PM
>> To: "users@kafka.apache.org" <
>> users@kafka.apache.org>
>> Subject: Out of memory - Java Heap space
>>
>> Hey all,
>>
>> I have a kafka cluster of 5 nodes that’s working really hard. CPU is
>> around 40% idle daily.
>>
>> I looked at the file descriptor note on this documentation page
>> http://docs.confluent.io/1.0/kafka/deployment.html#file-descriptors-and-mmap
>> and decided to give it a shot on one instance in the cluster just to see
>> how it performed. I increased this number to 1048576.
>>
>> I kept getting this error from the kafka logs:
>> ERROR [ReplicaFetcherThread--1-6], Error due to
>> (kafka.server.ReplicaFetcherThread) java.lang.OutOfMemoryError: Java heap
>> space
>>
>> I increased heap to see if that would help and I kept seeing these
>> errors. Could the file descriptor change have something related to this?
>>
>>
>>
>> —
>> Nicholas McKoy
>> Engineering – Big Data and Personalization
>> Washington Post Media
>>
>> One Franklin Square, Washington, DC 20001
>> Email: nicholas.mc...@washpost.com
>>
>>
>


Re: poll method thread

2016-04-27 Thread Spico Florin
Hi, Liquan!
  Thank you for your response. Is much more clear now.
 Regards,
 Florin

On Sun, Apr 24, 2016 at 9:54 PM, Liquan Pei  wrote:

> Hi Spico,
>
> Kafka Consumer is single threaded which means all operations such as
> sending heart beat, fech records and maintain group membership are done in
> the same thread as the caller.
>
> Also, poll() is a blocking method with timeout and you can interrupt it
> with the wakeup method in Kafka Consumer.
>
> Thanks,
> Liquan
>
>
> On Sun, Apr 24, 2016 at 11:43 AM, Spico Florin 
> wrote:
>
> > hi!
> >  i would like to ask if the kafka consumer poll method is done in
> aseprated
> > thread than the caller or in the same thread as the caller?
> > it is syncriunous blocking method or asynch?
> >  thank you
> > florin
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>


Re: Out of memory - Java Heap space

2016-04-27 Thread Jaikiran Pai
Have you tried getting the memory usage output using tool like jmap and 
seeing what's consuming the memory? Also, what are you heap sizes for 
the process?


-Jaikiran

On Tuesday 19 April 2016 02:31 AM, McKoy, Nick wrote:

To follow up with my last email, I have been looking into 
socket.receive.buffer.byte as well as socket.send.buffer.bytes. Would it help 
to increase the buffer for OOM issue?

All help is appreciated!

Thanks!


-nick


From: "McKoy, Nick" 
mailto:nicholas.mc...@washpost.com>>
Date: Monday, April 18, 2016 at 3:41 PM
To: "users@kafka.apache.org" 
mailto:users@kafka.apache.org>>
Subject: Out of memory - Java Heap space

Hey all,

I have a kafka cluster of 5 nodes that’s working really hard. CPU is around 40% 
idle daily.

I looked at the file descriptor note on this documentation page 
http://docs.confluent.io/1.0/kafka/deployment.html#file-descriptors-and-mmap 
and decided to give it a shot on one instance in the cluster just to see how it 
performed. I increased this number to 1048576.

I kept getting this error from the kafka logs:
ERROR [ReplicaFetcherThread--1-6], Error due to 
(kafka.server.ReplicaFetcherThread) java.lang.OutOfMemoryError: Java heap space

I increased heap to see if that would help and I kept seeing these errors. 
Could the file descriptor change have something related to this?



—
Nicholas McKoy
Engineering – Big Data and Personalization
Washington Post Media

One Franklin Square, Washington, DC 20001
Email: nicholas.mc...@washpost.com





Re: kafka_2.10-0.8.2.1 High CPU load

2016-04-27 Thread Jaikiran Pai
We have had this issue in 0.8.x and at that time we did not investigate 
it. Recently we upgraded to 0.9.0.1 and had similar issue which we 
investigated and narrowed down to what's explained here 
http://mail-archives.apache.org/mod_mbox/kafka-users/201604.mbox/%3C571F23ED.7050405%40gmail.com%3E. 
If upgrading to 0.9.0.1 is an option for you, then you might want to do 
that and read through that mail for potential ways to get past this issue.


-Jaikiran
On Friday 22 April 2016 03:04 PM, Kafka wrote:

Hi,we use kafka_2.10-0.8.2.1,and our vm machine config is:4 core,8G
our cluster is consist of three brokers,and our broker config is default 2 
replica,our broker load often very high once in a while,
load is greater than 1.5 on average core。

we have about 70 topics on this cluster
when we use Top util, we can see 280% cpu unilization, then i use JSTACK, I 
found there are 4 threads use cpu most, which show below:
"kafka-network-thread-9092-0" prio=10 tid=0x7f46c8709000 nid=0x35dd 
runnable [0x7f46b73f2000]
java.lang.Thread.State: RUNNABLE
"kafka-network-thread-9092-1" prio=10 tid=0x7f46c873c000 nid=0x35de 
runnable [0x7f46b75f4000]
/kafka-network-thread
"kafka-network-thread-9092-2" prio=10 tid=0x7f46c8756000 nid=0x35df 
runnable [0x7f46b7cfb000]
java.lang.Thread.State: RUNNABLE
"kafka-network-thread-9092-3" prio=10 tid=0x7f46c876f800 nid=0x35e0 
runnable [0x7f46b5adb000]
java.lang.Thread.State: RUNNABLE


I found one task:https:// 
issues.apache.org/jira/browse/KAFKA-493
  concerns this, but I think this 
does’t fit me, because we have other clusters that deployed in
the same config’s vm machine, and others are not occurred this.

the brokers config is same,but we add:

replica.fetch.wait.max.ms=100
num.replica.fetchers=2


I want to question does is the main reason, or have other reason that leads to 
high cpu load?




Re: Kafka To Hadoop

2016-04-27 Thread Ashutosh Kumar
http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html

On Wed, Apr 27, 2016 at 1:59 PM, Mudit Kumar  wrote:

> Hi,
>
> I have a running kafka setup with 3 brokers.Now i want to sink all kafka
> to write to hdfs.My hadoop cluster is already up and running.
> Any blog/doc for configuring the same.
>
> Thanks,
> Mudit


Re: Kafka To Hadoop

2016-04-27 Thread Ashutosh Kumar
http://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect

On Wed, Apr 27, 2016 at 1:59 PM, Mudit Kumar  wrote:

> Hi,
>
> I have a running kafka setup with 3 brokers.Now i want to sink all kafka
> to write to hdfs.My hadoop cluster is already up and running.
> Any blog/doc for configuring the same.
>
> Thanks,
> Mudit


Kafka To Hadoop

2016-04-27 Thread Mudit Kumar
Hi,

I have a running kafka setup with 3 brokers.Now i want to sink all kafka to 
write to hdfs.My hadoop cluster is already up and running.
Any blog/doc for configuring the same.

Thanks,
Mudit