Re: Long start time for consumer

2018-05-29 Thread Jaikiran Pai
Are your topics dynamically created? If so, see this 
threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html


-Jaikiran


On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:

Hello,

We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
partitions. I have an application which consumes from all these topics by
creating multiple consumer processes. All of these consumers are under a
same consumer group. I am noticing that every time we restart this
application. It takes almost 5 minutes for consumers to start consuming.
What might be going wrong?





Re: Does anyone publish or subscribe kafka topic in TestNG?

2018-05-13 Thread Jaikiran Pai
It's quite possible that the bootstrap server being used in your test 
case is different (since you pull it out of some "details") from the one 
being used in the standalone Java program. I don't mean the IP address 
(since the logs do indicate it is localhost), but I think it might be 
the port. Perhaps the test case uses a SSL backed port?


-Jaikiran


On 13/05/18 1:46 PM, James Yu wrote:

Hi,

Does anyone publish or subscribe kafka topic in TestNG?
I try to publish and subscribe kafka topic in my TestNG test case, and  I
always get the following exception:

2018-05-13 15:33:58.540 WARN
o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer
clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing
connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
(size = -2062548992)
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

and this is how I publish a message to kafka topic:

Map producerInfo = (Map)
details.get("producer");
Properties props = new Properties();
props.put("bootstrap.servers", ""+details.get("bootstrapServers"));
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer producer = new
org.apache.kafka.clients.producer.KafkaProducer(props);

int nrPartition = (int) producerInfo.get("nrPartition");
for(KeeperAccessRecord rec : records) {
  String msg = gson.toJson(rec);
  int partition = msg.hashCode() % nrPartition;
  producer.send(new ProducerRecord(""+producerInfo.get("topic"), partition, "a", "a"));
}
producer.close();


Same code snippet works fine if I put it into a simple java program like
the following:

public class SimpleProducer {
   public static void main(String[] args) throws Exception{
 String topicName = "MyTopic";
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("key.serializer", StringSerializer.class.getName());
 props.put("value.serializer", StringSerializer.class.getName());
 Producer producer = new KafkaProducer(props);
 for(int i = 0; i < 10; i++) {
   producer.send(new ProducerRecord(topicName,
Integer.toString(i), Integer.toString(i)));
 }
 System.out.println("Message sent successfully");
 producer.close();
   }
}



This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275
+8615618429976





Re: SSL metrics for simple Producer/Consumer implementation

2018-05-12 Thread Jaikiran Pai
There's kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh 
scripts that are shipped as part of the Kafka binary. You can setup your 
setup with SSL and then try and run it against them to get some numbers 
of your own.


-Jaikiran


On 30/04/18 4:58 PM, M. Manna wrote:

Hello,

We wanted to implement SSL for our producer/consumer authentication, but we
are not certain regarding how good the performance is when using SSL. By
performance, we refer to metrics such as message exchanges per second (and
any delays).

Our production testbed is setup with 3 brokers and we use encryption and
hashing to hide message contents. All the messages are in plain text format.
We have about 10 million messages (i.e. records, each with max size of 32
kb) exchanged between production servers every 8 hours (appx). So, all we
need to know via some metrics that the delay (if any) is
acceptable/tolerable.


Has anyone tested this with SSL and have some metrics to share?

Regards,





Re: Running SSL and PLAINTEXT mode together (Kafka 10.2.1)

2017-12-20 Thread Jaikiran Pai
When you say not able to write to a Kafka broker, do you mean your 
producer isn't able to produce a message? What does your producer 
configs look like? What exact exception, error or DEBUG logs do you see 
when you attempt this?


We do use a similar setup, so I do know that such a configuration works 
fine.


-Jaikiran


On 21/12/17 1:49 AM, Darshan wrote:

Hi Jaikiran

With that config, my internal kafka client can't write to the Kafka broker.
What I am looking for is that internal client can write to Kafka topic
without having to have any truststore setup, while external kafka client
MUST have certificate, and truststore setup and can read only if ACLs are
programmed for that topic.

Any idea if such a thing exists ?

Thanks.


On Tue, Dec 19, 2017 at 10:10 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:


What exact issue are you running into with thta configs?

-Jaikiran



On 20/12/17 7:24 AM, Darshan wrote:


Anyone ?

On Mon, Dec 18, 2017 at 7:25 AM, Darshan <purandare.dars...@gmail.com>
wrote:

Hi

I am wondering if there is a way to run the SSL and PLAINTEXT mode
together ? I am running Kafka 10.2.1. We want our internal clients to use
the PLAINTEXT mode to write to certain topics, but any external clients
should use SSL to read messages on those topics. We also want to enforce
ACLs.

To try this out, I modified my server.properties as follows, but without
any luck. Can someone please let me know if it needs any change ?

listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://172.1.1.157:9093
advertised.listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://
172.1.1.157:9093
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
inter.broker.listener.name=INTERNAL

ssl.keystore.location=/opt/keystores/keystotr.jks
ssl.keystore.password=ABCDEFGH
ssl.key.password=ABCDEFGH
ssl.truststore.location=/opt/keystores/truststore.jks
ssl.truststore.password=ABCDEFGH
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.protocol=SSL
ssl.client.auth=required
# allow.everyone.if.no.acl.found=false
allow.everyone.if.no.acl.found=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=KafkaBroker01

Thanks.

--Darshan






Re: Running SSL and PLAINTEXT mode together (Kafka 10.2.1)

2017-12-19 Thread Jaikiran Pai

What exact issue are you running into with thta configs?

-Jaikiran


On 20/12/17 7:24 AM, Darshan wrote:

Anyone ?

On Mon, Dec 18, 2017 at 7:25 AM, Darshan 
wrote:


Hi

I am wondering if there is a way to run the SSL and PLAINTEXT mode
together ? I am running Kafka 10.2.1. We want our internal clients to use
the PLAINTEXT mode to write to certain topics, but any external clients
should use SSL to read messages on those topics. We also want to enforce
ACLs.

To try this out, I modified my server.properties as follows, but without
any luck. Can someone please let me know if it needs any change ?

listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://172.1.1.157:9093
advertised.listeners=INTERNAL://10.10.10.64:9092,EXTERNAL://
172.1.1.157:9093
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
inter.broker.listener.name=INTERNAL

ssl.keystore.location=/opt/keystores/keystotr.jks
ssl.keystore.password=ABCDEFGH
ssl.key.password=ABCDEFGH
ssl.truststore.location=/opt/keystores/truststore.jks
ssl.truststore.password=ABCDEFGH
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.protocol=SSL
ssl.client.auth=required
# allow.everyone.if.no.acl.found=false
allow.everyone.if.no.acl.found=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=KafkaBroker01

Thanks.

--Darshan





Re: Sub:Application threads never seen data but topic is full loaded

2017-11-26 Thread Jaikiran Pai
Can you show us some snippet of code where you are consuming this data? 
Which language consumer are you using and how many consumers are part of 
the (same) group? Which exact version of Kafka broker and which version 
of the client side libraries?


-Jaikiran


On 14/11/17 6:01 PM, chandarasekaran m wrote:

Hi Team,

Can any one explain how data is consumed from high-level
consumer? As per my understanding ,fetcher thread fetch the data from
broker then push in to blocking queue(in-memory queue per topic) and
application threads(consumer) will read data from blocking queue.

In my application consumer thread never seen message but partition for that
thread(consumer) has lag(has messages).

It means fetcher thread never pushed data in to blocking queue? can any one
explain ? please



With Regards
M.Chandarasekaran,





Re: 0.9.0.0 Log4j appender slow startup

2017-11-06 Thread Jaikiran Pai
Can you take a couple of thread dumps with an interval of around 5 seconds
each when that 60 second delay occurs? You can use a tool like jstack to do
that. That might give some hint on what’s going on.

-Jaikiran

On Monday, November 6, 2017, Preston, Dale <dale.pres...@conocophillips.com>
wrote:

> (Answering now from my work email so please don't be confused.)
>
> The topic is already existing.
>
> -Original Message-----
> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com <javascript:;>]
> Sent: Sunday, November 5, 2017 10:56 PM
> To: users@kafka.apache.org <javascript:;>
> Subject: [EXTERNAL]Re: 0.9.0.0 Log4j appender slow startup
>
> Is the topic to which the message is being produced, already present or is
> it auto created?
>
> -Jaikiran
>
>
> On 05/11/17 3:43 PM, Dale wrote:
> > I am using the 0.9.0.0 log4j appender for Kafka because I have a lot of
> apps dependent on log4j 1.2.x that cannot be upgraded to use newer versions
> of log4j.   It appears that the appender has become part of log4j code in
> later versions of both tools.
> >
> > When I start my test app, the first message takes an exact and
> consistent 60 seconds plus a couple milliseconds to go out.  The second
> message takes right around 200 milliseconds, and all the messages after
> that take a couple of milliseconds.  The timing from message 1 to 2 could
> be tolerated but the 60 seconds will never work since the production use
> case app would typically run for 20 to 30 seconds.
> >
> > For testing, I brought the appender code into my project and added some
> additional console messages so I could see what is going on.  Here’s a
> snippet of the console output:
> >
> > START LOG SNIPPET***
> > G:\kafkademoworkspace\testlog4jgenerator>java -Dlog4j.debug
> > -Dlog4j.configuration=file:///g:\kafkademoworkspace\testlog4jgenerator
> > \log4j.properties -cp
> > .\;G:\kafkademoworkspace\testlog4jgenerator\target\testlog4jgenerator.
> > jar;g:\kafkademoworkspace\testlog4jgenerator\target\libs\log4j-1.2.17.
> > jar;g:\kafkademoworkspace\testlog4jgenerator\target\libs\*;g:\kafkadem
> > oworkspace\testlog4jgenerator\target\libs\kafka-clients-0.9.0.0.jar
> > com.mydomainname.messaging.testlog4jgenerator.LogGenerator
> > log4j: Using URL 
> > [file:/g:/kafkademoworkspace/testlog4jgenerator/log4j.properties]
> for automatic log4j configuration.
> > log4j: Reading configuration from URL
> > file:/g:/kafkademoworkspace/testlog4jgenerator/log4j.properties
> > log4j: Parsing for [root] with value=[DEBUG,file,KAFKA].
> > log4j: Level token is [DEBUG].
> > log4j: Category root set to DEBUG
> > log4j: Parsing appender named "file".
> > log4j: Parsing layout options for "file".
> > log4j: Setting property [conversionPattern] to [%d{-MM-dd
> HH:mm:ss,SSS} %-5p %c{1}:%L - %m%n].
> > log4j: End of parsing for "file".
> > log4j: Setting property [file] to [/apps/logs/logtest.log].
> > log4j: Setting property [maxBackupIndex] to [10].
> > log4j: Setting property [maxFileSize] to [10MB].
> > log4j: setFile called: /apps/logs/logtest.log, true
> > log4j: setFile ended
> > log4j: Parsed "file" options.
> > log4j: Parsing appender named "KAFKA".
> > log4j: Parsing layout options for "KAFKA".
> > log4j: Setting property [conversionPattern] to [%d{-MM-dd
> HH:mm:ss,SSS} %-5p %c{1}:%L - %m%n].
> > log4j: End of parsing for "KAFKA".
> > log4j: Setting property [compressionType] to [none].
> > log4j: Setting property [topic] to [test].
> > log4j: Setting property [brokerList] to [localhost:9092].
> > log4j: Setting property [syncSend] to [false].
> > DPLOG: 2017-11-05T09:56:16.072Z - in Producer - creating new
> > KafkaProducer
> > log4j: Kafka producer connected to localhost:9092
> > log4j: Logging for topic: test
> > log4j: Parsed "KAFKA" options.
> > log4j: Finished configuring.
> > 
> > DPLOG: 2017-11-05T09:56:16.338Z - append START
> > DPLOG: 2017-11-05T09:56:16.339Z - after subAppend.  Message is:
> 2017-11-05 03:56:16,333 DEBUG Sender:123 - Starting Kafka producer I/O
> thread.
> >
> > log4j: [Sun Nov 05 03:56:16 CST 2017]2017-11-05 03:56:16,333 DEBUG
> Sender:123 - Starting Kafka producer I/O thread.
> >
> > DPLOG: 2017-11-05T09:56:16.342Z - getting ready to send to producer.
> > DPLOG: 2017-11-05T09:57:16.347Z - after send to producer.
> > DPLOG: 2017-11-05T09:57:16.348Z - append END
> > *

Re: 0.9.0.0 Log4j appender slow startup

2017-11-05 Thread Jaikiran Pai
Is the topic to which the message is being produced, already present or 
is it auto created?


-Jaikiran


On 05/11/17 3:43 PM, Dale wrote:

I am using the 0.9.0.0 log4j appender for Kafka because I have a lot of apps 
dependent on log4j 1.2.x that cannot be upgraded to use newer versions of 
log4j.   It appears that the appender has become part of log4j code in later 
versions of both tools.

When I start my test app, the first message takes an exact and consistent 60 
seconds plus a couple milliseconds to go out.  The second message takes right 
around 200 milliseconds, and all the messages after that take a couple of 
milliseconds.  The timing from message 1 to 2 could be tolerated but the 60 
seconds will never work since the production use case app would typically run 
for 20 to 30 seconds.

For testing, I brought the appender code into my project and added some 
additional console messages so I could see what is going on.  Here’s a snippet 
of the console output:

START LOG SNIPPET***
G:\kafkademoworkspace\testlog4jgenerator>java -Dlog4j.debug 
-Dlog4j.configuration=file:///g:\kafkademoworkspace\testlog4jgenerator\log4j.properties
 -cp 
.\;G:\kafkademoworkspace\testlog4jgenerator\target\testlog4jgenerator.jar;g:\kafkademoworkspace\testlog4jgenerator\target\libs\log4j-1.2.17.jar;g:\kafkademoworkspace\testlog4jgenerator\target\libs\*;g:\kafkademoworkspace\testlog4jgenerator\target\libs\kafka-clients-0.9.0.0.jar
 com.mydomainname.messaging.testlog4jgenerator.LogGenerator
log4j: Using URL 
[file:/g:/kafkademoworkspace/testlog4jgenerator/log4j.properties] for automatic 
log4j configuration.
log4j: Reading configuration from URL 
file:/g:/kafkademoworkspace/testlog4jgenerator/log4j.properties
log4j: Parsing for [root] with value=[DEBUG,file,KAFKA].
log4j: Level token is [DEBUG].
log4j: Category root set to DEBUG
log4j: Parsing appender named "file".
log4j: Parsing layout options for "file".
log4j: Setting property [conversionPattern] to [%d{-MM-dd HH:mm:ss,SSS} 
%-5p %c{1}:%L - %m%n].
log4j: End of parsing for "file".
log4j: Setting property [file] to [/apps/logs/logtest.log].
log4j: Setting property [maxBackupIndex] to [10].
log4j: Setting property [maxFileSize] to [10MB].
log4j: setFile called: /apps/logs/logtest.log, true
log4j: setFile ended
log4j: Parsed "file" options.
log4j: Parsing appender named "KAFKA".
log4j: Parsing layout options for "KAFKA".
log4j: Setting property [conversionPattern] to [%d{-MM-dd HH:mm:ss,SSS} 
%-5p %c{1}:%L - %m%n].
log4j: End of parsing for "KAFKA".
log4j: Setting property [compressionType] to [none].
log4j: Setting property [topic] to [test].
log4j: Setting property [brokerList] to [localhost:9092].
log4j: Setting property [syncSend] to [false].
DPLOG: 2017-11-05T09:56:16.072Z - in Producer - creating new KafkaProducer
log4j: Kafka producer connected to localhost:9092
log4j: Logging for topic: test
log4j: Parsed "KAFKA" options.
log4j: Finished configuring.

DPLOG: 2017-11-05T09:56:16.338Z - append START
DPLOG: 2017-11-05T09:56:16.339Z - after subAppend.  Message is: 2017-11-05 
03:56:16,333 DEBUG Sender:123 - Starting Kafka producer I/O thread.

log4j: [Sun Nov 05 03:56:16 CST 2017]2017-11-05 03:56:16,333 DEBUG Sender:123 - 
Starting Kafka producer I/O thread.

DPLOG: 2017-11-05T09:56:16.342Z - getting ready to send to producer.
DPLOG: 2017-11-05T09:57:16.347Z - after send to producer.
DPLOG: 2017-11-05T09:57:16.348Z - append END

DPLOG: 2017-11-05T09:57:16.352Z - append START
DPLOG: 2017-11-05T09:57:16.353Z - after subAppend.  Message is: 2017-11-05 
03:56:16,338 INFO  root:36 - Logging message: x=0

log4j: [Sun Nov 05 03:56:16 CST 2017]2017-11-05 03:56:16,338 INFO  root:36 - 
Logging message: x=0

DPLOG: 2017-11-05T09:57:16.361Z - getting ready to send to producer.
DPLOG: 2017-11-05T09:57:16.526Z - after send to producer.
DPLOG: 2017-11-05T09:57:16.526Z - append END

DPLOG: 2017-11-05T09:57:16.527Z - append START
DPLOG: 2017-11-05T09:57:16.528Z - after subAppend.  Message is: 2017-11-05 
03:57:16,527 INFO  root:36 - Logging message: x=1

log4j: [Sun Nov 05 03:57:16 CST 2017]2017-11-05 03:57:16,527 INFO  root:36 - 
Logging message: x=1

DPLOG: 2017-11-05T09:57:16.529Z - getting ready to send to producer.
DPLOG: 2017-11-05T09:57:16.530Z - after send to producer.
DPLOG: 2017-11-05T09:57:16.530Z - append END

DPLOG: 2017-11-05T09:57:16.531Z - append START
DPLOG: 2017-11-05T09:57:16.531Z - after subAppend.  Message is: 2017-11-05 
03:57:16,531 INFO  root:36 - Logging message: x=2

log4j: [Sun Nov 05 03:57:16 CST 2017]2017-11-05 03:57:16,531 INFO  root:36 - 
Logging message: x=2

DPLOG: 2017-11-05T09:57:16.532Z - getting ready to send to producer.
DPLOG: 2017-11-05T09:57:16.533Z - after send to 

Re: [ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread Jaikiran Pai
Congratulations Kafka team on the release. Happy to see Kafka reach this 
milestone. It has been a pleasure using Kafka and also interacting with 
the Kafka team.


-Jaikiran


On 01/11/17 7:57 PM, Guozhang Wang wrote:

The Apache Kafka community is pleased to announce the release for Apache
Kafka 1.0.0.

This is a major release of the Kafka project, and is no mere bump of the
version number. The Apache Kafka Project Management Committee has packed a
number of valuable enhancements into the release. Let me summarize a few of
them:

** Since its introduction in version 0.10, the Streams API has become
hugely popular among Kafka users, including the likes of Pinterest,
Rabobank, Zalando, and The New York Times. In 1.0, the the API continues to
evolve at a healthy pace. To begin with, the builder API has been improved
(KIP-120). A new API has been added to expose the state of active tasks at
runtime (KIP-130). Debuggability gets easier with enhancements to the
print() and writeAsText() methods (KIP-160). And if that’s not enough,
check out KIP-138 and KIP-161 too. For more on streams, check out the
Apache Kafka Streams documentation (https://kafka.apache.org/docu
mentation/streams/), including some helpful new tutorial videos.

** Operating Kafka at scale requires that the system remain observable, and
to make that easier, we’ve made a number of improvements to metrics. These
are too many to summarize without becoming tedious, but Connect metrics
have been significantly improved (KIP-196), a litany of new health check
metrics are now exposed (KIP-188), and we now have a global topic and
partition count (KIP-168). Check out KIP-164 and KIP-187 for even more.

** We now support Java 9, leading, among other things, to significantly
faster TLS and CRC32C implementations. Over-the-wire encryption will be
faster now, which will keep Kafka fast and compute costs low when
encryption is enabled.

** In keeping with the security theme, KIP-152 cleans up the error handling
on Simple Authentication Security Layer (SASL) authentication attempts.
Previously, some authentication error conditions were indistinguishable
from broker failures and were not logged in a clear way. This is cleaner
now.

** Kafka can now tolerate disk failures better. Historically, JBOD storage
configurations have not been recommended, but the architecture has
nevertheless been tempting: after all, why not rely on Kafka’s own
replication mechanism to protect against storage failure rather than using
RAID? With KIP-112, Kafka now handles disk failure more gracefully. A
single disk failure in a JBOD broker will not bring the entire broker down;
rather, the broker will continue serving any log files that remain on
functioning disks.

** Since release 0.11.0, the idempotent producer (which is the producer
used in the presence of a transaction, which of course is the producer we
use for exactly-once processing) required max.in.flight.requests.per.connection
to be equal to one. As anyone who has written or tested a wire protocol can
attest, this put an upper bound on throughput. Thanks to KAFKA-5949, this
can now be as large as five, relaxing the throughput constraint quite a bit.


All of the changes in this release can be found in the release notes:

https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html


You can download the source release from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka-1.0.0-src.tgz

and binary releases from:

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
(Scala
2.11)
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.12-1.0.0.tgz
(Scala
2.12)



---

Apache Kafka is a distributed streaming platform with four four core APIs:

** The Producer API allows an application to publish a stream records to one
or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more topics
and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming
an input stream from one or more topics and producing an output stream to
one or more output topics, effectively transforming the input streams to
output streams.

** The Connector API allows building and running reusable producers or
consumers
that connect Kafka topics to existing applications or data systems. For
example, a connector to a relational database might capture every change to
a table.three key capabilities:


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data between
systems or applications.

** Building real-time streaming applications that transform or react
to the streams
of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, 

Re: Experimenting with Kafka and OpenSSL

2017-10-30 Thread Jaikiran Pai
I haven't yet had a chance to try out Java 9, but that's definitely on 
my TODO list, maybe sometime this weekend.


Thanks for pointing me to KAFKA-2561. I had missed that.

-Jaikiran


On 30/10/17 4:17 PM, Mickael Maison wrote:

Thanks for sharing, very interesting read.

Did you get a chance to try JDK 9 ?

We also considered using OpenSSL instead of JSSE especially since
Netty made an easy to re-use package (netty-tcnative).

There was KAFKA-2561
(https://issues.apache.org/jira/browse/KAFKA-2561) where people shared
a few numbers and what would be need to get it working.

On Mon, Oct 30, 2017 at 8:08 AM, Jaikiran Pai <jai.forums2...@gmail.com> wrote:

We have been using Kafka in some of our projects for the past couple of
years. Our experience with Kafka and SSL had shown some performance issues
when we had seriously tested it (which admittedly was around a year back).
Our basic tests did show that things had improved over time with newer
versions, but we didn't get a chance to fully test and move to SSL for
Kafka.

Incidentally, I happened to be looking into some other things related to SSL
and decided to experiment with using openssl as the SSL provider for Kafka.
I had heard OpenSSL performs better than the engine shipped default in JRE,
but hadn't ever got a chance to do any experiments. This past few weeks, I
decided to spend some time trying it. I have noted the experimentation and
the performance numbers in my blog[1]. The initial basic performance testing
(using the scripts shipped in Kafka) does show promising improvements. Like
I note in my blog, this was a very basic performance test just to see if
OpenSSL can be pursued as an option (both in terms of being functional and
performant) if we do decide to.

I know some of the members in these lists do extensive performance testing
with Kafka (and SSL), so I thought I will bring this to their notice.

[1] https://jaitechwriteups.blogspot.com/2017/10/kafka-with-openssl.html

-Jaikiran





Experimenting with Kafka and OpenSSL

2017-10-30 Thread Jaikiran Pai
We have been using Kafka in some of our projects for the past couple of 
years. Our experience with Kafka and SSL had shown some performance 
issues when we had seriously tested it (which admittedly was around a 
year back). Our basic tests did show that things had improved over time 
with newer versions, but we didn't get a chance to fully test and move 
to SSL for Kafka.


Incidentally, I happened to be looking into some other things related to 
SSL and decided to experiment with using openssl as the SSL provider for 
Kafka. I had heard OpenSSL performs better than the engine shipped 
default in JRE, but hadn't ever got a chance to do any experiments. This 
past few weeks, I decided to spend some time trying it. I have noted the 
experimentation and the performance numbers in my blog[1]. The initial 
basic performance testing (using the scripts shipped in Kafka) does show 
promising improvements. Like I note in my blog, this was a very basic 
performance test just to see if OpenSSL can be pursued as an option 
(both in terms of being functional and performant) if we do decide to.


I know some of the members in these lists do extensive performance 
testing with Kafka (and SSL), so I thought I will bring this to their 
notice.


[1] https://jaitechwriteups.blogspot.com/2017/10/kafka-with-openssl.html

-Jaikiran



Re: Add Kafka user list

2017-10-11 Thread Jaikiran Pai
I'm curious how these emails even get delivered to this (and sometimes 
the dev list) if the user isn't yet subscribed (through the 
users-subscribe mailing list)? Is this mailing list setup to accept 
mails from unsubscribed users?


-Jaikiran


On 11/10/17 12:32 PM, Jakub Scholz wrote:

Out of curiosity ... there seem to be quite a lot of these emails. I wonder
if we can do something to improve on this.

Was someone thinking about changing the UX on the Kafka website? Maybe
removing the links from the users@kafka... email? Or rephrasing the
sentence to make the subscribe email be first in the sentence? Or at least
making the subscribe links bold?

Also, when I first came to the Kafka website to look for the mailing lists,
the "Contact Us" section wasn't my first choice to look for them. Wouldn't
renaming it to "Discussion" or "Community" be more intuitive?

Thanks
Jakub

On Tue, Oct 10, 2017 at 7:35 PM, Matthias J. Sax 
wrote:


If you want to subscribe follow instructions here:
http://kafka.apache.org/contact

On 10/10/17 2:07 AM, shawnding(丁晓坤) wrote:

Add Kafka user list







Re: Reduce Kafka Client logging

2017-09-06 Thread Jaikiran Pai

Can you post the exact log messages that you are seeing?

-Jaikiran


On 07/09/17 7:55 AM, Raghav wrote:

Hi

My Java code produces Kafka config overtime it does a send which makes log
very very verbose.

How can I reduce the Kafka client (producer) logging in my java code ?

Thanks for your help.





Re: Kafka high cpu usage and disconnects

2017-03-23 Thread Jaikiran Pai
One thing that you might want to check is the number of consumers that 
are connected/consuming against this Kafka setup. We have consistently 
noticed that the CPU usage of the broker is very high even with very few 
consumers (around 10 Java consumers). There's even a JIRA for it. From 
what I remember, it had to do with the constant hearbeat and other such 
network activities that happen between these consumers and the broker. 
We had this issues since 0.8.x days till 0.10.0.1. We just migrated to 
0.10.2.0 and we will have to see if it is still reproducible in there.


I don't mean to say you are running into the same issue, but you can 
check that aspect as well (maybe shutdown all consumers and see how the 
broker CPU behaves).


-Jaikiran

On Thursday 23 March 2017 06:15 PM, Paul van der Linden wrote:

Thanks. I managed to get a cpu dump from staging.

The output:
THREAD START (obj=5427, id = 24, name="RMI TCP Accept-0",
group="system")
THREAD START (obj=5427, id = 21, name="main", group="main")
THREAD START (obj=5427, id = 25, name="SensorExpiryThread",
group="main")
THREAD START (obj=58e6, id = 26,
name="ThrottledRequestReaper-Fetch", group="main")
THREAD START (obj=58e6, id = 27,
name="ThrottledRequestReaper-Produce", group="main")
THREAD START (obj=5914, id = 28,
name="ZkClient-EventThread-18-zookeeper:2181", group="main")
THREAD START (obj=58e6, id = 29, name="main-SendThread()",
group="main")
THREAD START (obj=5950, id = 200010, name="main-EventThread",
group="main")
THREAD START (obj=5427, id = 200011, name="pool-3-thread-1",
group="main")
THREAD END (id = 200011)
THREAD START (obj=5427, id = 200012,
name="metrics-meter-tick-thread-1", group="main")
THREAD START (obj=5427, id = 200014, name="kafka-scheduler-0",
group="main")
THREAD START (obj=5427, id = 200013, name="kafka-scheduler-1",
group="main")
THREAD START (obj=5427, id = 200015, name="kafka-scheduler-2",
group="main")
THREAD START (obj=5c33, id = 200016, name="kafka-log-cleaner-thread-0",
group="main")
THREAD START (obj=5427, id = 200017,
name="kafka-network-thread-2-PLAINTEXT-0", group="main")
THREAD START (obj=5427, id = 200018,
name="kafka-network-thread-2-PLAINTEXT-1", group="main")
THREAD START (obj=5427, id = 200019,
name="kafka-network-thread-2-PLAINTEXT-2", group="main")
THREAD START (obj=5427, id = 200020,
name="kafka-socket-acceptor-PLAINTEXT-9092", group="main")
THREAD START (obj=58e6, id = 200021, name="ExpirationReaper-2",
group="main")
THREAD START (obj=58e6, id = 200022, name="ExpirationReaper-2",
group="main")
THREAD START (obj=5427, id = 200023,
name="metrics-meter-tick-thread-2", group="main")
THREAD START (obj=5427, id = 200024, name="kafka-scheduler-3",
group="main")
THREAD START (obj=5427, id = 200025, name="kafka-scheduler-4",
group="main")
THREAD START (obj=5427, id = 200026, name="kafka-scheduler-5",
group="main")
THREAD START (obj=5427, id = 200027, name="kafka-scheduler-6",
group="main")
THREAD START (obj=58e6, id = 200028, name="ExpirationReaper-2",
group="main")
THREAD START (obj=58e6, id = 200029, name="ExpirationReaper-2",
group="main")
THREAD START (obj=58e6, id = 200030, name="ExpirationReaper-2",
group="main")
THREAD START (obj=5427, id = 200031, name="group-metadata-manager-0",
group="main")
THREAD START (obj=5427, id = 200032, name="kafka-request-handler-0",
group="main")
THREAD START (obj=5427, id = 200037, name="kafka-request-handler-5",
group="main")
THREAD START (obj=5427, id = 200036, name="kafka-request-handler-4",
group="main")
THREAD START (obj=5427, id = 200035, name="kafka-request-handler-3",
group="main")
THREAD START (obj=5427, id = 200034, name="kafka-request-handler-2",
group="main")
THREAD START (obj=5427, id = 200033, name="kafka-request-handler-1",
group="main")
THREAD START (obj=5427, id = 200038, name="kafka-request-handler-6",
group="main")
THREAD START (obj=5427, id = 200039, name="kafka-request-handler-7",
group="main")
THREAD START (obj=5427, id = 200040, name="kafka-scheduler-7",
group="main")
THREAD START (obj=5427, id = 200041, name="kafka-scheduler-8",
group="main")
THREAD START (obj=5ee2, id = 200042, name="ReplicaFetcherThread-0-0",
group="main")
THREAD START (obj=5ee2, id = 200043, name="ReplicaFetcherThread-0-1",
group="main")
THREAD START (obj=5427, id = 200044, name="kafka-scheduler-9",
group="main")
THREAD START (obj=5427, id = 200045, name="executor-Fetch",
group="main")
TRACE 300920:
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
TRACE 300518:
java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:Unknown line)

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-03-02 Thread Jaikiran Pai
Thank you for pointing me to that JIRA. It indeed is the same issue we 
discussed in this thread. I'll keep a watch on that JIRA for the code to 
be merged.


-Jaikiran

On Thursday 02 March 2017 07:11 PM, Rajini Sivaram wrote:

This issue is being addressed in KAFKA-4631. See
https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
PR https://github.com/apache/kafka/pull/2622 for details.

Regards,

Rajini

On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:


For future reference - I asked this question on dev mailing list and based
on the discussion there was able to come up with a workaround to get this
working. Details here https://www.mail-archive.com/d
e...@kafka.apache.org/msg67613.html

-Jaikiran


On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:


We are on Kafka 0.10.0.1 (server and client) and use Java
consumer/producer APIs. We have an application where we create Kafka topics
dynamically (using the AdminUtils Java API) and then start
producing/consuming on those topics. The issue we frequently run into is
this:

1. Application process creates a topic "foo-bar" via
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be
enrolled in consumer group for this topic because of this (notice the last
line in the log):

2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer
- Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer
- Subscribed to topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Received group coordinator response
ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
mpletionHandler@50aad50f, request=RequestSend(header={ap
i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
body={group_id=my-app-group}), createdTimeMs=1487667523378,
sendTimeMs=1487667523529), responseBody={error_code=0,coo
rdinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Discovered coordinator
localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator - Revoking previously assigned
partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Sending JoinGroup
({group_id=my-app-group,session_timeout=3,member_id=,
protocol_type=consumer,group_protocols=[{protocol_name=
range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Received successful join group
response for group my-app-group: {error_code=0,generation_id=1,
group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator - Performing assignment for group
my-app-group using strategy range with subscriptions
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
n(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
foo-bar since no metadata is available*


4. A few seconds later, a separate process, produces (via Java producer
API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on
the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance which
then successfully assigns partition(s) of this foo-bar topic to co

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-03-01 Thread Jaikiran Pai
For future reference - I asked this question on dev mailing list and 
based on the discussion there was able to come up with a workaround to 
get this working. Details here 
https://www.mail-archive.com/dev@kafka.apache.org/msg67613.html


-Jaikiran

On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
We are on Kafka 0.10.0.1 (server and client) and use Java 
consumer/producer APIs. We have an application where we create Kafka 
topics dynamically (using the AdminUtils Java API) and then start 
producing/consuming on those topics. The issue we frequently run into 
is this:


1. Application process creates a topic "foo-bar" via 
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java 
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to 
be enrolled in consumer group for this topic because of this (notice 
the last line in the log):


2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received group coordinator response 
ClientResponse(receivedTimeMs=1487667523542, disconnected=false, 
request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, 
body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Sending JoinGroup 
({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: 
null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Performing assignment for group my-app-group using strategy range with 
subscriptions 
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor 
- Skipping assignment for topic foo-bar since no metadata is available*



4. A few seconds later, a separate process, produces (via Java 
producer API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) 
on the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance 
which then successfully assigns partition(s) of this foo-bar topic to 
consumer created in step#2 and the consumer start consuming these 
messages.


This 5 minute delay in consuming messages from this dynamically 
created topic is what we want to avoid. Is there anyway I can 
deterministically do/force creation of a dynamic topic and be assured 
that upon completion of that call, I can create a consumer and start 
consuming of that topic such that it can receive messages as soon as 
the messages are produced on that topic, without having to wait for a 
5 minute delay (o

Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-02-21 Thread Jaikiran Pai
We are on Kafka 0.10.0.1 (server and client) and use Java 
consumer/producer APIs. We have an application where we create Kafka 
topics dynamically (using the AdminUtils Java API) and then start 
producing/consuming on those topics. The issue we frequently run into is 
this:


1. Application process creates a topic "foo-bar" via 
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java 
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be 
enrolled in consumer group for this topic because of this (notice the 
last line in the log):


2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received group coordinator response 
ClientResponse(receivedTimeMs=1487667523542, disconnected=false, 
request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, 
body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Sending JoinGroup 
({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Performing assignment for group my-app-group using strategy range with 
subscriptions 
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
Skipping assignment for topic foo-bar since no metadata is available*



4. A few seconds later, a separate process, produces (via Java producer 
API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on 
the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance 
which then successfully assigns partition(s) of this foo-bar topic to 
consumer created in step#2 and the consumer start consuming these messages.


This 5 minute delay in consuming messages from this dynamically created 
topic is what we want to avoid. Is there anyway I can deterministically 
do/force creation of a dynamic topic and be assured that upon completion 
of that call, I can create a consumer and start consuming of that topic 
such that it can receive messages as soon as the messages are produced 
on that topic, without having to wait for a 5 minute delay (or whatever 
the rebalance configuration is)? In essence, is there a way to ensure 
that the Kafka consumer does get the topic metadata for a topic that was 
created successfully by the same application, immediately?



P.S: We have topic auto creation disabled, so this isn't really a auto 
topic creation issue. In our case we are 

Re: Kafka producer dropping records

2016-11-22 Thread Jaikiran Pai
That tells you that the acknowledgements (which in your case, you have 
set to receive ACKs from all brokers in the ISR) aren't happening and 
that can essentially mean that the records aren't making it to the 
topics. How many brokers do you have? What's the replication factor on 
the topic and what are the ISR brokers for the topic? Ultimately, you 
have to figure out why these ACKs aren't happening.


-Jaikiran

On Tuesday 22 November 2016 02:26 PM, Phadnis, Varun wrote:

Hello,

We had tried that... If future.get() is added in the while loop, it takes too 
long for the loop to execute.

Last time we tried it, it was running for that file for over 2 hours and still 
not finished.

Regards,
Varun

-Original Message-
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future. What happens when you 
add a future.get() on the returned Future, in that while loop, for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:

Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

  Properties properties = new Properties();
  properties.put("bootstrap.servers", );
  properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
  properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
  properties.put("retries",100);
 properties.put("acks", "all");

 KafkaProducer<Object, String> producer =  new
KafkaProducer<>(properties);

  try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
  String line;
  int count = 0;
  while ((line = bf.readLine()) != null) {
  count++;
  producer.send(new ProducerRecord<>(topicName, line));
  }
  Logger.log("Done producing data messages. Total no of records 
produced:" + count);
  } catch (InterruptedException | ExecutionException | IOException e) {
  Throwables.propagate(e);
  } finally {
  producer.close();
  }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


  ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
 --time -1 --topic 




The output of this was :


  topic_name:1:292954

  topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun





Re: Kafka producer dropping records

2016-11-22 Thread Jaikiran Pai
The KafkaProducer.send returns a Future. What happens 
when you add a future.get() on the returned Future, in that while loop, 
for each sent record?


-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:

Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

 Properties properties = new Properties();
 properties.put("bootstrap.servers", );
 properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
 properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
 properties.put("retries",100);
properties.put("acks", "all");

KafkaProducer producer =  new 
KafkaProducer<>(properties);

 try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
 String line;
 int count = 0;
 while ((line = bf.readLine()) != null) {
 count++;
 producer.send(new ProducerRecord<>(topicName, line));
 }
 Logger.log("Done producing data messages. Total no of records 
produced:" + count);
 } catch (InterruptedException | ExecutionException | IOException e) {
 Throwables.propagate(e);
 } finally {
 producer.close();
 }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  
--time -1 --topic 




The output of this was :


 topic_name:1:292954

 topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun





Re: connection closed by kafka

2016-11-02 Thread Jaikiran Pai
Which exact version of Kafka installation and Kafka client is this? And 
which language/library of Kafka client? Also, are you describing this 
situation in the context of producing messages? Can you post your 
relevant code from the application where you deal with this?


Connection management is an internal detail of Kafka client libraries 
and usually won't end up outside of it, so you shouldn't really notice 
any of these issues.



-Jaikiran

On Friday 28 October 2016 05:50 AM, Jianbin Wei wrote:

In our environment we notice that sometimes Kafka would close the connection 
after one message is sent over.  The client does not detect that and tries to 
send another message again.  That triggers a RST packet.

Any idea why the Kafka broker would close the connection?

Attached you can find the packets between our client and kafka broker.


20:55:40.834543 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [S], seq 
31787730, win 14600, options [mss 1460,nop,nop,sackOK,nop,wscale 9], length 0
0x:  4500 0034 8cc1 4000 4006 ca67 ac12 45c2  E..4..@.@..g..E.
0x0010:  ac12 45b4 868d 2384 01e5 0ad2    ..E...#.
0x0020:  8002 3908 e3c1  0204 05b4 0101 0402  ..9.
0x0030:  0103 0309
20:55:40.834744 IP 172.18.69.180.9092 > 172.18.69.194.34445: Flags [S.], seq 
1238329644, ack 31787731, win 14600, options [mss 1460,nop,nop,sackOK,nop,wscale 
1], length 0
0x:  4500 0034  4000 4006 5729 ac12 45b4  E..4..@.@.W)..E.
0x0010:  ac12 45c2 2384 868d 49cf 692c 01e5 0ad3  ..E.#...I.i,
0x0020:  8012 3908 e89e  0204 05b4 0101 0402  ..9.
0x0030:  0103 0301
20:55:40.834787 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [.], ack 1, 
win 29, length 0
0x:  4500 0028 8cc2 4000 4006 ca72 ac12 45c2  E..(..@.@..r..E.
0x0010:  ac12 45b4 868d 2384 01e5 0ad3 49cf 692d  ..E...#.I.i-
0x0020:  5010 001d e3b5   P...
20:55:40.834921 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [P.], seq 
1:691, ack 1, win 29, length 690
0x:  4500 02da 8cc3 4000 4006 c7bf ac12 45c2  E.@.@.E.
0x0010:  ac12 45b4 868d 2384 01e5 0ad3 49cf 692d  ..E...#.I.i-
0x0020:  5018 001d e667   02ae    Pg..
0x0030:   0003 000c 6b61 666b 612d 7079 7468  ..kafka-pyth
0x0040:  6f6e 0001  03e8  0001 000e 6576  onev
0x0050:  656e 745f 6e73 706f 6c69 6379  0001  ent_nspolicy
0x0060:     0272      ...r
0x0070:   0266 4ff3 bd11   0004 3131  ...fO.11
0x0080:  3238  0254 5b30 2c7b 2261 7022 3a22  28...T[0,{"ap":"

20:55:40.835297 IP 172.18.69.180.9092 > 172.18.69.194.34445: Flags [.], ack 
691, win 7990, length 0
0x:  4500 0028 e872 4000 4006 6ec2 ac12 45b4  E..(.r@.@.n...E.
0x0010:  ac12 45c2 2384 868d 49cf 692d 01e5 0d85  ..E.#...I.i-
0x0020:  5010 1f36 408b       P..6@.
20:55:40.837837 IP 172.18.69.180.9092 > 172.18.69.194.34445: Flags [P.], seq 
1:47, ack 691, win 7990, length 46
0x:  4500 0056 e873 4000 4006 6e93 ac12 45b4  E..V.s@.@.n...E.
0x0010:  ac12 45c2 2384 868d 49cf 692d 01e5 0d85  ..E.#...I.i-
0x0020:  5018 1f36 ece3   002a  0003  P..6...*
0x0030:   0001 000e 6576 656e 745f 6e73 706f  ..event_nspo
0x0040:  6c69 6379  0001      licy
0x0050:   0003 6527   e'
20:55:40.837853 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [.], ack 47, 
win 29, length 0
0x:  4500 0028 8cc4 4000 4006 ca70 ac12 45c2  E..(..@.@..p..E.
0x0010:  ac12 45b4 868d 2384 01e5 0d85 49cf 695b  ..E...#.I.i[
0x0020:  5010 001d e3b5   P...


Closed here

21:05:40.839440 IP 172.18.69.180.9092 > 172.18.69.194.34445: Flags [F.], seq 
47, ack 691, win 7990, length 0
0x:  4500 0028 e874 4000 4006 6ec0 ac12 45b4  E..(.t@.@.n...E.
0x0010:  ac12 45c2 2384 868d 49cf 695b 01e5 0d85  ..E.#...I.i[
0x0020:  5011 1f36 405c       P..6@\
21:05:40.876047 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [.], ack 48, 
win 29, length 0
0x:  4500 0028 8cc5 4000 4006 ca6f ac12 45c2  E..(..@.@..o..E.
0x0010:  ac12 45b4 868d 2384 01e5 0d85 49cf 695c  ..E...#.I.i\
0x0020:  5010 001d e3b5   P...

21:06:17.673188 IP 172.18.69.194.34445 > 172.18.69.180.9092: Flags [P.], seq 
691:1414, ack 48, win 29, length 723
0x:  4500 02fb 8cc6 4000 4006 c79b ac12 45c2  E.@.@.E.
0x0010:  ac12 45b4 868d 2384 01e5 0d85 49cf 695c  ..E...#.I.i\
0x0020:  

0.10.1.0 - commitSync() doesn't contribute to "aliveness" of a consumer?

2016-11-01 Thread Jaikiran Pai
We are using Kafka 0.10.1.0 (server) and Java client API (the new API) 
for consumers. One of the issues we have been running into is that the 
consumer is considered "dead" by the co-ordinator because of the lack of 
activity within a specific period of time. In reality, the consumer is 
still alive. We see exceptions like these:



org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.



I understand what that exception means and what we could potentially do 
to address that (setting a low value for max.poll.records is one 
option). Before changing the max.poll.records value in our setup, I 
would like to hear/understand a bit more about this so that I know this 
is a right way to fix in the way we have implemented our consumers. 
Essentially, our consumer code is this:


while (!stopped) {
try {
final ConsumerRecords consumerRecords = 
consumer.poll(someValue);
for (final TopicPartition topicPartition : 
consumerRecords.partitions()) {

if (stopped) {
break;
}
for (final ConsumerRecord consumerRecord 
: consumerRecords.records(topicPartition)) {
final long previousOffset = 
consumerRecord.offset();
// commit the offset and then pass on the 
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(previousOffset + 1)));


this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}



As you can see the only thing that happens in the main thread which the 
consumer is polling on is - commitSync for each record that was returned 
in that batch of poll. I understand commitSync is blocking, so 
potentially this can lead to each commitSync invocation adding up to the 
time between each poll(). One option is using commitAsync, but we need 
to evaluate if it has other issues within our usecase.


But what I was wondering was, why doesn't commitSync contribute to the 
logic of the consumer being alive? If it did, then I see no reason why 
this consumer will ever be considered dead and that above message 
logged. Anyone see a problem with the code above?


P.S: We use the default session timeout value in the consumer configs 
(i.e. we don't set any specific value)



-Jaikiran


Re: what's the relationship between Zookeeper and Kafka ?

2016-09-14 Thread Jaikiran Pai
In addition to what Michael noted, this question has been asked a few 
times before too and here's one such previous discussion 
https://www.quora.com/What-is-the-actual-role-of-ZooKeeper-in-Kafka


-Jaikiran

On Wednesday 14 September 2016 03:50 AM, Michael Noll wrote:

Eric,

the latest versions of Kafka use ZooKeeper only on the side of the Kafka
brokers, i.e. the servers in a Kafka cluster.

Background:
In older versions of Kafka, the Kafka consumer API required client
applications (that would read from data Kafka) to also talk to ZK.  Why
would they need to do that:  because ZK was used, in the old Kafka consumer
API, to track which data records they had already consumed, to rewind
reading from Kafka in case of failures like client machine crashes, and so
on.  In other words, consumption-related metadata was managed in ZK.
However, no "actual" data was ever routed through ZK.

The latest versions of Kafka have an improved consumer API that no longer
needs to talk to ZK -- any information that was previously maintained in ZK
(by these client apps) is now stored directly in Kafka.

Going back to your Spark programs:  They are using these older consumer API
versions of Kafka that still require talking to ZooKeeper, hence the need
to set things like "zoo1:2181".


Does the kafka data actually get routed out of zookeeper before delivering
the payload onto Spark ?

This was never the case (old API vs. new API).  Otherwise this would have
been a significant bottleneck. :-)  Data has always been served through the
Kafka brokers only.

Hope this helps,
Michael





On Sat, Sep 10, 2016 at 4:22 PM, Valerio Bruno  wrote:


AFAIK Kafka uses Zookeeper to coordinate the Kafka clusters ( set of
brokers ).

Consumers usually connect Zookeeper to retrieve the list of brokers. Then
connect the  broker.

*Valerio*

On 10 September 2016 at 22:11, Eric Ho  wrote:


I notice that some Spark programs would contact something like

'zoo1:2181'

when trying to suck data out of Kafka.

Does the kafka data actually get routed out of zookeeper before

delivering

the payload onto Spark ?



--

-eric ho




--
*Valerio Bruno*





*+39 3383163406+45 2991720...@valeriobruno.it fax: +39
1782275656skype: valerio_brunohttp://www.valeriobruno.it
*





Re: kafkaproducer send blocks until broker is available

2016-09-14 Thread Jaikiran Pai
This is a known issue and is being tracked in this JIRA 
https://issues.apache.org/jira/browse/KAFKA-3539


-Jaikiran
On Saturday 10 September 2016 12:20 AM, Peter Sinoros Szabo wrote:

Hi,

I'd like to use the Java Kafka producer in a non-blocking async mode.
My assuptions were that until the new message can fit into the producer's
memory, it will queue up those messages and send out once the broker is
available.
I tested a simple case when I am sending messages using
KafkaProducer.send(), but the kafka broker is not available yet (a.k.a the
broker starts later then the application).
I see that in this case the send() blocks, although the documentation says
that this method is async.
Is it possible to configure kafka in a way so that the the producer
bufferes the messages sent out until the broker gets available?

Regards,
Peter










Re: Too many open files

2016-09-14 Thread Jaikiran Pai

What does the output of:

lsof -p 

show on that specific node?

-Jaikiran

On Monday 12 September 2016 10:03 PM, Michael Sparr wrote:

5-node Kafka cluster, bare metal, Ubuntu 14.04.x LTS with 64GB RAM, 8-core, 
960GB SSD boxes and a single node in cluster is filling logs with the following:

[2016-09-12 09:34:49,522] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:323)
at kafka.network.Acceptor.run(SocketServer.scala:268)
at java.lang.Thread.run(Thread.java:745)

No other nodes in cluster have this issue. Separate application server has 
consumers/producers using librdkafka + confluent kafka python library with a 
few million messages published to under 100 topics.

For days now the /var/log/kafka/kafka.server.log.N are filling up server with this 
message and using up all space on only a single server node in cluster. I have 
soft/hard limits at 65,535 for all users so > ulimit -n reveals 65535

Is there a setting I should add from librdkafka config in the Python producer 
clients to shorten socket connections even further to avoid this or something 
else going on?

Should I write this as issue in Github repo and if so, which project?


Thanks!






Re: Too many open files

2016-09-14 Thread Jaikiran Pai

What does the output of:

lsof -p 

show?

-Jaikiran

On Monday 12 September 2016 10:03 PM, Michael Sparr wrote:

5-node Kafka cluster, bare metal, Ubuntu 14.04.x LTS with 64GB RAM, 8-core, 
960GB SSD boxes and a single node in cluster is filling logs with the following:

[2016-09-12 09:34:49,522] ERROR Error while accepting connection 
(kafka.network.Acceptor)
java.io.IOException: Too many open files
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at 
sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
at kafka.network.Acceptor.accept(SocketServer.scala:323)
at kafka.network.Acceptor.run(SocketServer.scala:268)
at java.lang.Thread.run(Thread.java:745)

No other nodes in cluster have this issue. Separate application server has 
consumers/producers using librdkafka + confluent kafka python library with a 
few million messages published to under 100 topics.

For days now the /var/log/kafka/kafka.server.log.N are filling up server with this 
message and using up all space on only a single server node in cluster. I have 
soft/hard limits at 65,535 for all users so > ulimit -n reveals 65535

Is there a setting I should add from librdkafka config in the Python producer 
clients to shorten socket connections even further to avoid this or something 
else going on?

Should I write this as issue in Github repo and if so, which project?


Thanks!






Re: Kafka bootup exception while recovering log file

2016-09-06 Thread Jaikiran Pai
I'm not from the Kafka dev team so I won't be able to comment whether 
this is an expected way to fail or if this needs to be handled in a more 
cleaner/robust manner (at least very least probably a better exception 
message). Since you have put in efforts to write a test case and narrow 
it down to this specific flow, maybe you can send a mail to their dev 
mailing list and/or maybe create a JIRA to report this.


-Jaikiran

On Tuesday 30 August 2016 12:07 PM, Gaurav Agarwal wrote:

Kafka version: 0.10.0

Exception Trace

java.util.NoSuchElementException
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:37)
at kafka.log.LogSegment.recover(LogSegment.scala:189)
at kafka.log.Log.recoverLog(Log.scala:268)
at kafka.log.Log.loadSegments(Log.scala:243)
at kafka.log.Log.(Log.scala:101)
at kafka.log.LogTest.testCorruptLog(LogTest.scala:830)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Test Code (same exception trace is see in broker logs as well on prod
machines with exactly the same lof files as given in this mini test)
-

val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, 15 * 1024 * 1024: java.lang.Integer)
val config = LogConfig(logProps)
val cp = new File("/Users/gaurav/Downloads/corrupt/gaurav/kafka-logs/Topic3-12")
var log = new Log(cp, config, 0, time.scheduler, time


On Tue, Aug 30, 2016 at 11:37 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:


Can you paste the entire exception stacktrace please?

-Jaikiran

On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:


Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a
serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal <gauravagarw...@gmail.com
wrote:

Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes
-
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102,
10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101,
108,
46, 105, 110, 118, 101, 110, 116, 11

Re: Kafka bootup exception while recovering log file

2016-08-30 Thread Jaikiran Pai

Can you paste the entire exception stacktrace please?

-Jaikiran
On Tuesday 30 August 2016 11:23 AM, Gaurav Agarwal wrote:

Hi there, just wanted to bump up the thread one more time to check if
someone can point us in the right direction... This one was quite a serious
failure that took down many of our kafka brokers..

On Sat, Aug 27, 2016 at 2:11 PM, Gaurav Agarwal 
wrote:


Hi All,

We are facing a weird problem where Kafka broker fails to start due to an
unhandled exception while 'recovering' a log segment. I have been able to
isolate the problem to a single record and providing the details below:

During Kafka restart, if index files are corrupted or they don't exist,
kafka broker is trying to 'recover' a LogSegment and rebuild the indexes -
LogSegment:recover()
I the main while loop here which iterates over the entries in the log:
while(iter.hasNext) { val entry = iter.next}, I get an entry with
complete underlying byte buffer as follows:

[82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0, -59, -126, 83, 78, 65,
80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79, -58, 1, 0, 0, 25, 1,
16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49, 48, 48, 48, 48, 58,
49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48, 0, 0, 0, -102, 10,
39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109, 111, 100, 101, 108,
46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46, 97, 100, 100, 46, 114,
101, 102, 101, 114, 101, 110, 99, 101, 16, -120, -115, -16, -64, -22, 42,
26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97, 109, 115, 46, 115, 116,
111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99, 101, 110, 116, 101,
114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115, 116, 77, 5, 11, 8,
34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50, 53, 52, 57, 50,
34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56,
48, 72]

A toString() on this entry yields:

*MessageAndOffset(Message(magic = 0, attributes = 2, crc = 1377740251, key
= null, payload = java.nio.HeapByteBuffer[pos=0 lim=197 cap=197]),4449011)*

It appears that this record is corrupt and deserializing/decompressing it
causes exceptions which are unhandled. Specifically in 0.10.0 version this
calls fails with NoSuchElementException

ByteBufferMessageSet.deepIterator(entry).next().offset

Note: This message was written to disk using* kafka 0.10.0 broker running
snappy jar version 1.1.1.7* (which is known to have some read time bugs).
The log file itself is 512MB large and this message appears at around 4MB
in the file.

We have upgraded snappy; but should this condition be handled correctly?
What is the correct behavior here? Should the exception be handled and log
file be truncated? At the moment this causes kafka to completely crash with
no recovery path except of deleting the bad data file manually and then
starting kafka.

--

cheers,

gaurav


A test case to repro the crash

@Test

def testCorruptLog() {

  val buf = Array[Byte](82, 30, -91, -37, 0, 2, -1, -1, -1, -1, 0, 0, 0,
-59, -126, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, -79,
-58, 1, 0, 0, 25, 1, 16, -68, 48, -78, -101, -61, 5, 15, -16, 74, 20, 49,
48, 48, 48, 48, 58, 49, 52, 52, 58, 50, 48, 54, 53, 52, 52, 54, 48, 56, 48,
0, 0, 0, -102, 10, 39, 99, 111, 109, 46, 118, 110, 101, 114, 97, 46, 109,
111, 100, 101, 108, 46, 105, 110, 118, 101, 110, 116, 111, 114, 121, 46,
97, 100, 100, 46, 114, 101, 102, 101, 114, 101, 110, 99, 101, 16, -120,
-115, -16, -64, -22, 42, 26, 57, 25, 48, 72, 112, 114, 111, 103, 114, 97,
109, 115, 46, 115, 116, 111, 114, 101, 46, 118, 109, 119, 97, 1, 7, 72, 99,
101, 110, 116, 101, 114, 46, 109, 97, 112, 112, 101, 114, 46, 72, 111, 115,
116, 77, 5, 11, 8, 34, 20, 34, 9, -122, 56, 52, 58, 49, 49, 50, 54, 49, 50,
53, 52, 57, 50, 34, 66, 20, 9, 21, 56, 49, 52, 52, 58, 50, 48, 54, 53, 52,
52, 54, 48, 56, 48, 72);

   val msg = new Message(ByteBuffer.wrap(buf), None, None)

   val entry = new MessageAndOffset(msg, 4449011L)

   val deepIterator: Iterator[MessageAndOffset] =
ByteBufferMessageSet.deepIterator(entry)

   deepIterator.next().offset

}





Re: kafka client on cloud

2016-08-26 Thread Jaikiran Pai
Can you explain what exactly you mean by "cloud" and what kind of 
restrictions you are running into in trying to point to the truststore 
location?


-Jaikiran
On Friday 19 August 2016 08:09 PM, Nomar Morado wrote:

kafka consumer/producer currently require path to keystore/truststore.

my client runs in cloud and won't have access to some actual path to my jks.

any ideas on the best way to handle this?

thanks.





Re: unexpected consumer rebalance 0.9.0.1

2016-08-26 Thread Jaikiran Pai


What's the heartbeat interval that you have set on these consumer 
configs (if any)? Can you also paste a snippet of your code to show what 
the consumer code looks like (including the poll and commit calls)?



-Jaikiran

On Tuesday 23 August 2016 07:55 PM, Franco Giacosa wrote:

Hi I am experiencing the following issue in kafka 0.9.0.1,

I have a consumer, that is in a consumer group alone processing and
commiting the offsets and at one point the group does a rebalance (I don't
know why) and removes the group.

The weird situation is that it seems that the consumer is working as
expected and in less than 1 second between commiting the previous offset,
processing the current message and commiting that offset it fails.

I imagine this has nothing to do with a timeout, but either way we have
those times setup pretty high request.timeout.ms=60,
group.max.session.timeout.ms=50



*KAFKA SERVER.LOG have the following lines around that time*

[*2016-08-19 01:09:10,857*] INFO [Group Metadata Manager on Broker 0]:
Removed 0 expired offsets in 0 milliseconds.
(kafka.coordinator.GroupMetadataManager)
[*2016-08-19 01:14:46,082*] INFO [GroupCoordinator 0]: Preparing to
restabilize group error-group with old generation 43
(kafka.coordinator.GroupCoordinator)
[*2016-08-19 01:14:46,083*] INFO [GroupCoordinator 0]: Group error-group
generation 43 is dead and removed (kafka.coordinator.GroupCoordinator)
[*2016-08-19 01:19:10,857*] INFO [Group Metadata Manager on Broker 0]:
Removed 0 expired offsets in 0 milliseconds.
(kafka.coordinator.GroupMetadataManager)

*On the consumer client I can see the following on the log around the same
time *

*01:14:44.839* [pool-5-thread-3] DEBUG o.a.k.c.c.i.ConsumerCoordinator -
Committed offset 132 for partition error-group-topic-4
*01:14:45.258* [pool-5-thread-3] DEBUG o.a.k.c.c.i.ConsumerCoordinator -
Committed offset 133 for partition error-group-topic-4
*01:14:45.703* [pool-5-thread-3] DEBUG o.a.k.c.c.i.ConsumerCoordinator -
Committed offset 134 for partition error-group-topic-4
*01:14:46.139* [pool-5-thread-3] ERROR o.a.k.c.c.i.ConsumerCoordinator -
Error ILLEGAL_GENERATION occurred while committing offsets for group
error-group
*01:14:46.140* [pool-5-thread-3] DEBUG c.g.e.c.f.m.ConnectorHandler -
Consumer Id=error-group : kafka-exception=Commit cannot be completed due to
group rebalance
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
~[kafka-clients-0.9.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
~[kafka-clients-0.9.0.1.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
~[kafka-clients-0.9.0.1.jar:na]





Re: Kafka 0.8.2.2 - CLOSE_WAITS on broker

2016-08-26 Thread Jaikiran Pai
Which Java vendor and version are you using in runtime? Also what OS is 
this? Can you get the lsof output (on Linux) and paste the output of 
that to some place (like gist) to show us what descriptors are open etc...


-Jaikiran

On Friday 26 August 2016 02:49 AM, Bharath Srinivasan wrote:

Hello:

We are running a data pipeline application stack using Kafka 0.8.2.2 in
production. We have been seeing intermittent CLOSE_WAIT on our kafka
brokers frequently and they fill up the file handles pretty quickly. By the
time the open file count reaches around 40K, the node becomes unresponsive
and we see huge GC pauses. The only way out has been restart of the node.
When the nodes are working fine, the average open files in the nodes stay
around 6K during peak load and 3K at average.

Configurations:
- 5 broker cluster (Single node spec: 24 core processors, 250 GB RAM, 256GB
SSD)
- 20 topics and 1100 partitions across all topics
- Replication factor of 3
- Java based KafkaProducer and high level consumers
(ZookeeperConsumerConnector)
- GC params { -Xmx32G -Xms4G -server -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50
-XX:MaxMetaspaceFreeRatio=80 }

Any pointers here? Appreciate your help.

Thanks,
Bharath





Re: consumer with version 0.10.0

2016-08-26 Thread Jaikiran Pai
Is anyone producing any (new) messages to the topics you are subscribing 
to in that consumer?


-Jaikiran
On Friday 26 August 2016 10:14 AM, Jack Yang wrote:

Hi all,
I am using kafka 0.10.0.1, and I set up my listeners like:

listeners=PLAINTEXT://myhostName:9092

then I have one consumer going using the new api. However, I did not see 
anything return for the api.
The log from kafka is:

[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,548] INFO [GroupCoordinator 0]: Stabilized group 
newconsumper generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:28,555] INFO [GroupCoordinator 0]: Assignment received from 
leader for group newconsumper for generation 1 
(kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Preparing to restabilize 
group newconsumper with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-08-26 14:39:29,401] INFO [GroupCoordinator 0]: Group newconsumper 
generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)


Here is the code:
p.put("bootstrap.servers", config.getString("kafka.broker"))
p.put("group.id", "newconsumper")
p.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
p.put("enable.auto.commit", "true")
p.put("heartbeat.interval.ms", "1")
p.put("session.timeout.ms", "3");

 val kafkaConsumer: KafkaConsumer[String, String] = new 
KafkaConsumer[String, String](p)
 kafkaConsumer.subscribe(util.Arrays.asList("test"))
 try {
   val timeout:Long = 1000
   val records: ConsumerRecords[String, String] = 
kafkaConsumer.poll(timeout)

   records.asScala.foreach(record => {
   println(record.key() + ":" + record.value() + ":" + record.offset())
   })

 } catch {
   case e: Exception => {
 e.printStackTrace
   }
 } finally {
   kafkaConsumer.close
 }

Best regards,
Jack






Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Jaikiran Pai
+1 for Java 8. Our eco-system which uses Kafka and many other open 
source projects are now fully on Java 8 since a year or more.


-Jaikiran
On Friday 17 June 2016 02:15 AM, Ismael Juma wrote:

Hi all,

I would like to start a discussion on making Java 8 a minimum requirement
for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
is the first discussion on the topic so the idea is to understand how
people feel about it. If people feel it's too soon, then we can pick up the
conversation again after Kafka 0.10.1.0. If the feedback is mostly
positive, I will start a vote thread.

Let's start with some dates. Java 7 hasn't received public updates since
April 2015[1], Java 8 was released in March 2014[2] and Java 9 is scheduled
to be released in March 2017[3].

The first argument for dropping support for Java 7 is that the last public
release by Oracle contains a large number of known security
vulnerabilities. The effectiveness of Kafka's security features is reduced
if the underlying runtime is not itself secure.

The second argument for moving to Java 8 is that it adds a number of
compelling features:

* Lambda expressions and method references (particularly useful for the
Kafka Streams DSL)
* Default methods (very useful for maintaining compatibility when adding
methods to interfaces)
* java.util.stream (helpful for making collection transformations more
concise)
* Lots of improvements to java.util.concurrent (CompletableFuture,
DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
* Other nice things: SplittableRandom, Optional (and many others I have not
mentioned)

The third argument is that it will simplify our testing matrix, we won't
have to test with Java 7 any longer (this is particularly useful for system
tests that take hours to run). It will also make it easier to support Scala
2.12, which requires Java 8.

The fourth argument is that many other open-source projects have taken the
leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
support Java 8 in the next version (although it will take a while before
most phones will use that version sadly). This reduces (but does not
eliminate) the chance that we would be the first project that would cause a
user to consider a Java upgrade.

The main argument for not making the change is that a reasonable number of
users may still be using Java 7 by the time Kafka 0.10.1.0 is released.
More specifically, we care about the subset who would be able to upgrade to
Kafka 0.10.1.0, but would not be able to upgrade the Java version. It would
be great if we could quantify this in some way.

What do you think?

Ismael

[1] https://java.com/en/download/faq/java_7.xml
[2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released
[3] http://openjdk.java.net/projects/jdk9/
[4] https://github.com/apache/cassandra/blob/trunk/README.asc
[5] https://lucene.apache.org/#highlights-of-this-lucene-release-include
[6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html
[7] https://issues.apache.org/jira/browse/HADOOP-11858
[8] https://webtide.com/jetty-9-3-features/
[9] http://markmail.org/message/l7s276y3xkga2eqf
[10]
https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under
[11] http://markmail.org/message/l7s276y3xkga2eqf





Re: Any restrictions on consumer group name?

2016-06-12 Thread Jaikiran Pai

Adding the Kafka dev list to cc, hoping they would answer this question.

-Jaikiran
On Friday 10 June 2016 11:18 AM, Jaikiran Pai wrote:
We are using 0.9.0.1 of Kafka server and (Java) clients. Our (Java) 
consumers are assigned to dynamic runtime generated groups i.e. the 
consumer group name is generated dynamically at runtime, using some 
application specific logic. I have been looking at the docs but 
haven't yet found anything that says if there is any restriction in 
the length and/or characters that make up the consumer group name. Can 
anyone confirm or point me to a doc which states whether or not there 
any restrictions on it?



-Jaikiran




Any restrictions on consumer group name?

2016-06-09 Thread Jaikiran Pai
We are using 0.9.0.1 of Kafka server and (Java) clients. Our (Java) 
consumers are assigned to dynamic runtime generated groups i.e. the 
consumer group name is generated dynamically at runtime, using some 
application specific logic. I have been looking at the docs but haven't 
yet found anything that says if there is any restriction in the length 
and/or characters that make up the consumer group name. Can anyone 
confirm or point me to a doc which states whether or not there any 
restrictions on it?



-Jaikiran


Re: Skipping assignment for topic * since no metadata is available

2016-06-09 Thread Jaikiran Pai


On Thursday 09 June 2016 08:00 PM, Patrick Kaufmann wrote:

Hello

Recently we’ve run into a problem when starting our application for the first 
time.

At the moment all our topics are auto-created. Now, at the first start there 
are no topics, so naturally some consumers try to connect to topics which don’t 
exist.
Those consumers now fail quite consistently with the following error:
Skipping assignment for topic  since no metadata is available

This then leads to the consumer not consuming any messages on that topic.
Take a look at "auto.offset.reset" property that you can pass to the 
consumers which is meant to deal with cases where the offsets aren't 
known/available for the topic. Depending on the version of Kafka you are 
using, the value of the property will be different, so check the 
relevant version documentation and see what value makes sense in your 
application.


-Jaikiran


Re: Message duplicated on incorrect topic - anyone else see this?

2016-06-09 Thread Jaikiran Pai
How do you check/verify the duplication of the message? Can you post 
relevant part of your producer code too?


-Jaikiran
On Thursday 09 June 2016 10:36 PM, Clark Breyman wrote:

We're seeing a situation in one of our clusters where a message will
occasionally be duplicated on an incorrect topic. No identifiable issues
spotted in either the client application or kafka logs.

Has anyone else see this? Seems like something that would raise concern.
Any recommendations for enhanced logging?

Client: 0.8.2.1 java producer with ISR acks
Cluster: 0.8.2.2 w/Scala 2.10.1, 3 replicas per topic





Re: newbie: kafka 0.9.0.0 producer does not terminate after producer.close()

2016-05-20 Thread Jaikiran Pai
You can take a thread dump (using "jstack ") when 
the program doesn't terminate and post that output here. That will tell 
us which threads are causing the program to not terminate.


-Jaikiran

On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote:

I wrote a little test client that reads from a file an publishes using the
0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to send
messages how ever I noticed that once I am done reading the input file my
test program hangs

Any idea what I am doing wrong?

Kind regards

Andy


public static void main(String[] args) throws IOException {
logger.warn("BEGIN");

readFromFile(cmdLine, producer, topic);



producer.flush();

producer.close();



logger.warn("END");

}


private static void readFromFile(CmdLine cmdLine, KafkaProducer producer,

String topic) throws IOException {



logger.info("BEGIN");

BufferedReader reader = cmdLine.getReader();

String value = null;



while ((value = reader.readLine()) != null) {

logger.info("sending value: " + value);

publish(producer, topic, value);

}

logger.info("END");

}



private static void publish(KafkaProducer producer, String
topic, String value) {

  Future response = producer.send(new ProducerRecord(topic, value));



/* TODO

  send() will raise following error.

  It is because we are using a 0.9.0.0 client with an 0.8 server. The 0.8
consumer seems

  to work with out problems

}



Š
INFO  17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN

Š
INFO  17:02:54 main c.p.g.p.KClient readFromFile line:85 sending value:
dependencies {

Š
INFO  17:02:54 main c.p.g.p.KClient readFromFile line:89 END

Š

The following error appears to be because we are using 0.9.0.0 api with an
0.8.x sever. If I read from stdin instead of a file I would be able to
continue sending messages. I do not think this is the reason my test code
hangs.

ERROR 17:02:54 kafka-producer-network-thread | producer-1 o.a.k.c.p.i.Sender
run line:130 Uncaught error in kafka producer I/O thread:

org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'throttle_time_ms': java.nio.BufferUnderflowException

at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)

at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
.java:464)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)











Re: Can we delete topic in kafka

2016-05-11 Thread Jaikiran Pai
That's actually not the right way to delete topics (or for that matter 
managing a Kafka instance). It can lead to odd/corrupt installation.


-Jaikiran

On Wednesday 11 May 2016 06:27 PM, Eduardo Costa Alfaia wrote:

Hi,
It’s better creating a script that delete the kafka folder where exist the 
kafka topic and after create it again if need.

BR


Eduardo Costa Alfaia
Ph.D. Student in Telecommunications Engineering
Università degli Studi di Brescia
Tel: +39 3209333018








On 5/11/16, 09:48, "Snehalata Nagaje"  
wrote:



Hi ,

Can we delete certain topic in kafka?

I have deleted using command

./kafka-topics.sh --delete --topic topic_billing --zookeeper localhost:2181

It says topic marked as deletion, but it does not actually delete topic.

Thanks,
Snehalata






Re: Zookeeper dies ... Kafka server unable to connect

2016-05-11 Thread Jaikiran Pai

On Tuesday 10 May 2016 09:29 PM, Radoslaw Gruchalski wrote:

Kafka is expecting the state to be there when the zookeeper comes back. One way 
to protect yourself from what you see happening, is to have a zookeeper quorum. 
Run a cluster of 3 zookeepers, then repeat your exercise.

Kafka will continue to work absolutely fine. Just remember, with 3 ZK 
instances, you can only kill one at a time.


I haven't run this kind of Zookeeper deployment before, so just curious 
- did you really mean that only one instance of Zookeeper can be stopped 
at a time when 3 of them were forming the cluster? Or would it still 
work if at least one instance was up and other 2 crashed/stopped either 
at the same time or one at a time.



-Jaikiran



–
Best regards,

Radek Gruchalski

ra...@gruchalski.com
de.linkedin.com/in/radgruchalski

Confidentiality:
This communication is intended for the above-named person and may be 
confidential and/or legally privileged.
If it has come to you in error you must take no action based on it, nor must 
you copy or show it to anyone; please delete/destroy and inform the sender 
immediately.

On May 10, 2016 at 5:56:58 PM, Paolo Patierno (ppatie...@live.com) wrote:

Yes correct ... the new restarted zookeeper instance is completely new ... it 
has no information about previous topics and brokers of course.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience


Date: Tue, 10 May 2016 17:55:10 +0200
From: ra...@gruchalski.com
To: users@kafka.apache.org
Subject: RE: Zookeeper dies ... Kafka server unable to connect
  
Ah, but your retarted container does not have any data Kafka recorded previously. Correct?

–
Best regards,

Radek Gruchalski

ra...@gruchalski.com
de.linkedin.com/in/radgruchalski
  
Confidentiality:

This communication is intended for the above-named person and may be 
confidential and/or legally privileged.
If it has come to you in error you must take no action based on it, nor must 
you copy or show it to anyone; please delete/destroy and inform the sender 
immediately.
  
On May 10, 2016 at 5:54:09 PM, Paolo Patierno (ppatie...@live.com) wrote:
  
This is what Kubernetes says me ...
  
Name: zookeeper

Namespace: default
Labels: 
Selector: name=zookeeper
Type: ClusterIP
IP: 10.0.0.184
Port: zookeeper 2181/TCP
Endpoints: 172.17.0.4:2181
Session Affinity: None
  
So the address is always 10.0.0.184.
  
 From the log I understand that the creash is released to the zookeeper pod I closed ... so kafka server lost connection to it.

Starting from there they should be the attempts to connect to the new zookeeper 
that is up and running with same IP address as the previous one.
  
Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat

Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience
  

Date: Tue, 10 May 2016 17:49:59 +0200
From: ra...@gruchalski.com
To: users@kafka.apache.org
Subject: Re: Zookeeper dies ... Kafka server unable to connect
  
Are you sure you’re getting the same IP address?

Regarding zookeeper connection being closed, is kubernetes doing a soft 
shutdown of your container? If so, zookeeper is asked politely to stop.
–
Best regards,

Radek Gruchalski

radek@gruchalski.commailto:ra...@gruchalski.com
de.linkedin.com/in/radgruchalski
+4917685656526

  
Confidentiality:

This communication is intended for the above-named person and may be 
confidential and/or legally privileged.
If it has come to you in error you must take no action based on it, nor must 
you copy or show it to anyone; please delete/destroy and inform the sender 
immediately.
  
On May 10, 2016 at 5:47:24 PM, Paolo Patierno (ppatie...@live.com) wrote:
  
Hi all,
  
experiencing with Kafka on Kubernetes I have the following error on Kafka server reconnection ...
  
A cluster with one zookeeper and two kafka server ... I turn off the zookeeper pod but kubernetes restart it and guaratees the same IP address for it but the kafka server starts to retry connection failing with following trace :
  
[2016-05-10 15:40:55,046] WARN Session 0x1549b308dd20002 for server 10.0.0.184/10.0.0.184:2181, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
[2016-05-10 15:40:55,149] INFO zookeeper state 

Re: Kafka Consumer consuming large number of messages

2016-05-04 Thread Jaikiran Pai
Going by the name of that property (max.partition.fetch.bytes), I'm 
guessing it's the max fetch bytes per partition of a topic. Are you sure 
the data you are receiving in that consumers doesn't belong to multiple 
partitions and hence can/might exceed the value that's set per 
partition? By the way, what does the consumer code look like, where you 
are verifying/measuring this consumed size?


-Jaikiran
On Thursday 05 May 2016 03:00 AM, Abhinav Solan wrote:

Thanks a lot Jens for the reply.
One thing is still unclear is this happening only when we set the
max.partitions.fetch.bytes to a higher value ? Because I am setting it
quite lower at 8192 only instead, because I can control the size of the
data coming in Kafka, so even after setting this value why the Consumer is
fetching more records, is the Consumer not honoring this property, or is
there some other logic which is making it to fetch more data.

Thanks,
Abhinav

On Wed, May 4, 2016 at 1:40 PM Jens Rantil  wrote:


Hi,

This is a known issue. The 0.10 release will fix this. See

https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
for some background.

Cheers,
Jens

Den ons 4 maj 2016 19:32Abhinav Solan  skrev:


Hi,

I am using kafka-0.9.0.1 and have configured the Kafka consumer  to fetch
8192 bytes by setting max.partition.fetch.bytes

Here are the properties I am using

props.put("bootstrap.servers", servers);
props.put("group.id", "perf-test");
props.put("offset.storage", "kafka");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", 6);
props.put("request.timeout.ms", 7);
props.put("heartbeat.interval.ms", 5);
props.put("auto.offset.reset", "latest");
props.put("max.partition.fetch.bytes", "8192");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

I am setting up 12 Consumers with 4 workers each to listen on a topic

with

200 partitions.
I have also enabled the compression when sending to Kafka.

The problem I am getting is, even though the fetch size is less, the
consumers when polling, poll too many records. If the topics have many
messages and it is behind in the consumption it tries to fetch bigger

size,

if the consumer is not behind then it try and fetch around 45, but

anyways

if I set the max.partition.fetch.bytes shouldn't the fetch size have an
upper limit ? Is there any other setting I am missing here ?
I am myself controlling the message size so it's not that some bigger
messages are coming through, each message must be around 200-300 bytes
only.

Due the large number of messages it is polling, the inner process

sometimes

not able to finish the process within the heartbeat interval limit, which
makes the consumer rebalancing kick in, again and again, this only

happens

when the consumer is way behind in offset e.g there are 10 messages

to

be processed in the topic.

Thanks


--

Jens Rantil
Backend Developer @ Tink

Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
For urgent matters you can reach me at +46-708-84 18 32.





Re: Receiving "The session timeout is not within an acceptable range" but AFAIK it is within range

2016-05-03 Thread Jaikiran Pai
From what you pasted, I can't say for certain whether you are using 
those properties as consumer level settings or broker level settings. 
The group.min.session.timeout.ms and the group.max.session.timeout.ms 
are broker level settings (as far as I understand) and should be part of 
your broker config (server.properties). Using them in consumer level 
settings while creating the consumer, will result in those properties 
being ignored. Can you confirm where exactly you have set those values?


-Jaikiran

On Tuesday 03 May 2016 11:25 PM, Mario Ricci wrote:

Hi all,

This seems like a basic question and I am probably just missing something, but when 
creating my consumer I am getting the error: "The session timeout is not within an 
acceptable range"

My settings seem to be good (group.max and group.min don't violate 
session.timeout).

Here are my settings:
request.timeout.ms=121000
session.timeout.ms=30001
group.min.session.timeout.ms=6000
group.max.session.timeout.ms=12
receive.buffer.bytes=262144
group.id=consumeproduce
auto.offset.reset=earliest
offsets.storage=zookeeper
bootstrap.servers=10.7.52.170:9092
dual.commit.enabled=true
max.partition.fetch.bytes=2097152
enable.auto.commit=false
value.serializer=org.apache.kafka.common.serialization.StringSerializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
fetch.min.bytes=5

and here is the code I believe that throws the error:
else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
   responseCallback(joinError(memberId, 
Errors.INVALID_SESSION_TIMEOUT.code))

I see that the error should not be thrown, though.  Can anyone please help? As 
soon as I get over 3 ms for session.timeout.ms I get that error.  I know 
previously I have had it much higher.

Mario





Re: Out of memory - Java Heap space

2016-04-27 Thread Jaikiran Pai
Have you tried getting the memory usage output using tool like jmap and 
seeing what's consuming the memory? Also, what are you heap sizes for 
the process?


-Jaikiran

On Tuesday 19 April 2016 02:31 AM, McKoy, Nick wrote:

To follow up with my last email, I have been looking into 
socket.receive.buffer.byte as well as socket.send.buffer.bytes. Would it help 
to increase the buffer for OOM issue?

All help is appreciated!

Thanks!


-nick


From: "McKoy, Nick" 
>
Date: Monday, April 18, 2016 at 3:41 PM
To: "users@kafka.apache.org" 
>
Subject: Out of memory - Java Heap space

Hey all,

I have a kafka cluster of 5 nodes that’s working really hard. CPU is around 40% 
idle daily.

I looked at the file descriptor note on this documentation page 
http://docs.confluent.io/1.0/kafka/deployment.html#file-descriptors-and-mmap 
and decided to give it a shot on one instance in the cluster just to see how it 
performed. I increased this number to 1048576.

I kept getting this error from the kafka logs:
ERROR [ReplicaFetcherThread--1-6], Error due to 
(kafka.server.ReplicaFetcherThread) java.lang.OutOfMemoryError: Java heap space

I increased heap to see if that would help and I kept seeing these errors. 
Could the file descriptor change have something related to this?



—
Nicholas McKoy
Engineering – Big Data and Personalization
Washington Post Media

One Franklin Square, Washington, DC 20001
Email: nicholas.mc...@washpost.com





Re: kafka_2.10-0.8.2.1 High CPU load

2016-04-27 Thread Jaikiran Pai
We have had this issue in 0.8.x and at that time we did not investigate 
it. Recently we upgraded to 0.9.0.1 and had similar issue which we 
investigated and narrowed down to what's explained here 
http://mail-archives.apache.org/mod_mbox/kafka-users/201604.mbox/%3C571F23ED.7050405%40gmail.com%3E. 
If upgrading to 0.9.0.1 is an option for you, then you might want to do 
that and read through that mail for potential ways to get past this issue.


-Jaikiran
On Friday 22 April 2016 03:04 PM, Kafka wrote:

Hi,we use kafka_2.10-0.8.2.1,and our vm machine config is:4 core,8G
our cluster is consist of three brokers,and our broker config is default 2 
replica,our broker load often very high once in a while,
load is greater than 1.5 on average core。

we have about 70 topics on this cluster
when we use Top util, we can see 280% cpu unilization, then i use JSTACK, I 
found there are 4 threads use cpu most, which show below:
"kafka-network-thread-9092-0" prio=10 tid=0x7f46c8709000 nid=0x35dd 
runnable [0x7f46b73f2000]
java.lang.Thread.State: RUNNABLE
"kafka-network-thread-9092-1" prio=10 tid=0x7f46c873c000 nid=0x35de 
runnable [0x7f46b75f4000]
/kafka-network-thread
"kafka-network-thread-9092-2" prio=10 tid=0x7f46c8756000 nid=0x35df 
runnable [0x7f46b7cfb000]
java.lang.Thread.State: RUNNABLE
"kafka-network-thread-9092-3" prio=10 tid=0x7f46c876f800 nid=0x35e0 
runnable [0x7f46b5adb000]
java.lang.Thread.State: RUNNABLE


I found one task:https:// 
issues.apache.org/jira/browse/KAFKA-493
  concerns this, but I think this 
does’t fit me, because we have other clusters that deployed in
the same config’s vm machine, and others are not occurred this.

the brokers config is same,but we add:

replica.fetch.wait.max.ms=100
num.replica.fetchers=2


I want to question does is the main reason, or have other reason that leads to 
high cpu load?




Re: 0.9.0.1 High CPU usage on broker - Why is the default heart beat interval set too low (3 seconds)?

2016-04-26 Thread Jaikiran Pai
Thanks for responding, Liquan. Just so I understand better, where does 
the coordinator reside? On the broker side? And the coordinator knows 
the heart beat interval configured on each consumer that's connected to 
the broker, I am guessing. For the usecase we have, having a higher 
value for heart beat works out fine for us.


I haven't yet fully thought of what I'm proposing, but would it be 
wise/feasible to introduce an option which disables heart beats 
altogether and instead relies on regular consumption/communication 
between the consumer and broker as a heart beat? That way, the usual 
consumption and offset commits from the consumer can be used as an 
inference of whether the consumer is alive or dead. That way, there 
won't be a need for regular heart beats being passed by the consumer. 
This helps in the case where you have many consumers connected to the 
broker and practically idle except for this heart beats being passed 
around. One implication of this would be that if a message isn't 
produced in the topics that the consumers listen to, for a while, then 
there won't be any communication (isn't it?) from the consumer and hence 
it might be considered dead and a re-balance triggered. Is that a 
bad/worse thing though?


-Jaikiran

On Tuesday 26 April 2016 02:10 PM, Liquan Pei wrote:

Hi Jaikiran,

Thanks for the email and the detailed analysis. One reason for setting the
heartbeat interval to a lower value is for faster failure detection. On
every received heartbeat, the coordinator starts (or resets) a timer. If no
heartbeat is received when the timer expires, the coordinator marks the
member dead and signals the rest of the group that they should rejoin so
that partitions can be reassigned.

I think the trade off here is the CPU usage and how fast you want to detect
the consumer failure. Faster failure detection makes the partitions
assigned to dead consumers to assign to other consumers.

Best,
Liquan


On Tue, Apr 26, 2016 at 1:16 AM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:


We have been investigating an unreasonably high CPU usage of the Kafka
process when there's no _real_ activity going on between the consumers and
the broker. We had this issue in 0.8.x days and is exactly the same as
what's being tracked in this JIRA
https://issues.apache.org/jira/browse/KAFKA-493. We now use 0.9.0.1 (both
client libraries, new consumer APIs and the broker). However, we still see
some CPU usage which looks a bit on the higher side when there's no real
message production or consumption going on. Just connecting around 10-20
consumers on different topics of a single broker Kafka instance shows up
this issue.

All our debugging so far points to the Processor thread on the broker side
which has a high CPU usage. There are N such Processor threads, which
always are in the RUNNABLE state doing this:

"kafka-network-thread-0-PLAINTEXT-0" #21 prio=5 os_prio=0
tid=0x7f1858c4a800 nid=0xc81 runnable [0x7f18106cb000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
 - locked <0x0006c0046128> (a sun.nio.ch.Util$2)
 - locked <0x0006c0046118> (a java.util.Collections$UnmodifiableSet)
 - locked <0x0006c0046068> (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
 at org.apache.kafka.common.network.Selector.select(Selector.java:425)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
 at kafka.network.Processor.run(SocketServer.scala:413)
 at java.lang.Thread.run(Thread.java:745)



 From what we have narrowed down so far, this thread in itself isn't a
"culprit", since when they are no consumers connected, the CPU isn't high.
However when a consumer connects to this and just waits for messages, these
threads start playing a role in the high CPU usage. Our debugging shows
that each of these X number of consumers that connect to the broker keep
doing 2 things when they are "idle":

1) A delayed operation every Y seconds which does the auto commit of
offsets.
2) Sending heartbeats every 3 seconds to the broker

We disabled auto commits of offsets since that's the semantic we wanted.
So #1 isn't really an issue. However, #2 is. It looks like the default
heartbeat interval is 3 seconds which is too low, IMO. This translates to a
network socket operation every 3 seconds which then has to be processed by
the broker side Processor thread. If there's just a single consumer, this
doesn't make much of a difference. As soon as you add more consumers, the
Processor on the broker side has to be start processing each of these
incoming heartbeats which become too frequent. Even though the interval is
3 seco

0.9.0.1 High CPU usage on broker - Why is the default heart beat interval set too low (3 seconds)?

2016-04-26 Thread Jaikiran Pai
We have been investigating an unreasonably high CPU usage of the Kafka 
process when there's no _real_ activity going on between the consumers 
and the broker. We had this issue in 0.8.x days and is exactly the same 
as what's being tracked in this JIRA 
https://issues.apache.org/jira/browse/KAFKA-493. We now use 0.9.0.1 
(both client libraries, new consumer APIs and the broker). However, we 
still see some CPU usage which looks a bit on the higher side when 
there's no real message production or consumption going on. Just 
connecting around 10-20 consumers on different topics of a single broker 
Kafka instance shows up this issue.


All our debugging so far points to the Processor thread on the broker 
side which has a high CPU usage. There are N such Processor threads, 
which always are in the RUNNABLE state doing this:


"kafka-network-thread-0-PLAINTEXT-0" #21 prio=5 os_prio=0 
tid=0x7f1858c4a800 nid=0xc81 runnable [0x7f18106cb000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0006c0046128> (a sun.nio.ch.Util$2)
- locked <0x0006c0046118> (a java.util.Collections$UnmodifiableSet)
- locked <0x0006c0046068> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)



From what we have narrowed down so far, this thread in itself isn't a 
"culprit", since when they are no consumers connected, the CPU isn't 
high. However when a consumer connects to this and just waits for 
messages, these threads start playing a role in the high CPU usage. Our 
debugging shows that each of these X number of consumers that connect to 
the broker keep doing 2 things when they are "idle":


1) A delayed operation every Y seconds which does the auto commit of 
offsets.

2) Sending heartbeats every 3 seconds to the broker

We disabled auto commits of offsets since that's the semantic we wanted. 
So #1 isn't really an issue. However, #2 is. It looks like the default 
heartbeat interval is 3 seconds which is too low, IMO. This translates 
to a network socket operation every 3 seconds which then has to be 
processed by the broker side Processor thread. If there's just a single 
consumer, this doesn't make much of a difference. As soon as you add 
more consumers, the Processor on the broker side has to be start 
processing each of these incoming heartbeats which become too frequent. 
Even though the interval is 3 seconds, the incoming heartbeats to the 
broker can be much more frequent when more consumers are involved since 
the 3 second interval is just per consumer. So in practice there can be 
a heartbeat coming every second or few milli seconds from the X 
consumers to this broker which can contribute to this high CPU usage 
when the system is practically idle.


So coming to the real question - why is the default heart beat interval 
so low - 3 seconds? We increased it to 29 seconds (just 1 second less 
than the session timeout) per consumer (via consumer configs) and in 
addition to disabling auto commit, these changes have improved 
noticeably the CPU usage.


Ideally, what would be a better value for the heart beat interval that 
doesn't unnecessary flood these messages and cause the broker to 
continuous process them?


-Jaikiran


Re: [ANNOUCE] Apache Kafka 0.9.0.0 Released

2015-11-24 Thread Jaikiran Pai
Congratulations on this release! Happy to see the security related 
features which we are going to start using soon.


-Jaikiran
On Tuesday 24 November 2015 10:46 PM, Jun Rao wrote:

The Apache Kafka community is pleased to announce the release for
Apache Kafka 0.9.0.0. This a major release that includes (1)
authentication (through SSL and SASL) and authorization, (2) a new
java consumer, (3) a Kafka connect framework for data ingestion and
egression, and (4) quotas. It also includes many critical bug fixes.

All of the changes in this release can be found: 
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/RELEASE_NOTES.html

Apache Kafka is high-throughput, publish-subscribe messaging system rethought 
of as a distributed commit log.

** Fast => A single Kafka broker can handle hundreds of megabytes of reads and
writes per second from thousands of clients.

** Scalable => Kafka is designed to allow a single cluster to serve as the 
central data backbone
for a large organization. It can be elastically and transparently expanded 
without downtime.
Data streams are partitioned and spread over a cluster of machines to allow 
data streams
larger than the capability of any single machine and to allow clusters of 
co-ordinated consumers.

** Durable => Messages are persisted on disk and replicated within the cluster 
to prevent
data loss. Each broker can handle terabytes of messages without performance 
impact.

** Distributed by Design => Kafka has a modern cluster-centric design that 
offers
strong durability and fault-tolerance guarantees.

You can download the source release from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka-0.9.0.0-src.tgz

and binary releases from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

A big thank you for the following people who have contributed to the
0.9.0.0 release.

Aditya Auradkar, Alexander Pakulov, Alexey Ozeritskiy, Alexis Midon,
Allen Wang, Anatoly Fayngelerin, Andrew Otto, Andrii Biletskyi, Anna
Povzner, Anton Karamanov, Ashish Singh, Balaji Seshadri, Ben Stopford,
Chris Black, Chris Cope, Chris Pinola, Daniel Compton, Dave Beech,
Dave Cromberge, Dave Parfitt, David Jacot, Dmytro Kostiuchenko, Dong
Lin, Edward Ribeiro, Eno Thereska, Eric Olander, Ewen
Cheslack-Postava, Fangmin Lv, Flavio Junqueira, Flutra Osmani, Gabriel
Nicolas Avellaneda, Geoff Anderson, Grant Henke, Guozhang Wang, Gwen
Shapira, Honghai Chen, Ismael Juma, Ivan Lyutov, Ivan Simoneko,
Jaikiran Pai, James Oliver, Jarek Jarcec Cecho, Jason Gustafson, Jay
Kreps, Jean-Francois Im, Jeff Holoman, Jeff Maxwell, Jiangjie Qin, Joe
Crobak, Joe Stein, Joel Koshy, Jon Riehl, Joshi, Jun Rao, Kostya
Golikov, Liquan Pei, Magnus Reftel, Manikumar Reddy, Marc Chung,
Martin Lemanski, Matthew Bruce, Mayuresh Gharat, Michael G. Noll,
Muneyuki Noguchi, Neha Narkhede, Onur Karaman, Parth Brahmbhatt, Paul
Mackles, Pierre-Yves Ritschard, Proneet Verma, Rajini Sivaram, Raman
Gupta, Randall Hauch, Sasaki Toru, Sriharsha Chintalapani, Steven Wu,
Stevo Slavic, Tao Xiao, Ted Malaska, Tim Brooks, Todd Palino, Tong Li,
Vivek Madani, Vladimir Tretyakov, Yaguo Zhou, Yasuhiro Matsuda,
Zhiqiang He

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at 
http://kafka.apache.org/

Thanks,

Jun




Re: kafka 0.8.2.2 release

2015-09-28 Thread Jaikiran Pai
It's been discussed here recently 
http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAFc58G_dn_mMGaJoyiw81-RdAFJ2NAgxQFLtc%3D9pU5PwPW_Kvg%40mail.gmail.com%3E


-Jaikiran
On Monday 28 September 2015 11:08 PM, Richard Lee wrote:
It appears from maven central and git that there was a 0.8.2.2 release 
on Sept 2nd.  However, the Kafka downloads page 
(https://kafka.apache.org/downloads.html) does not appear to have been 
updated.


Also, why aren't the kafka shell scripts made available as a .tgz file 
in maven central like they are for other apache projects (e.g. 
samza)?  I would prefer to download a tarball from there and cache it 
in my company's local maven repo rather than have a special manual 
step to fetch them.


Richard




Re: Unreasonably high CPU from Kafka (0.8.2.1)

2015-09-17 Thread Jaikiran Pai
Sending this to the dev list since the Kafka dev team might have more 
inputs on this one. Can someone please take a look at the issue noted 
below and whether the suggested change makes sense?


-Jaikiran
On Tuesday 15 September 2015 12:03 AM, Jaikiran Pai wrote:
We have been using Kafka for a while now in one of dev projects. 
Currently we have just 1 broker and 1 zookeeper instance. Almost every 
day, Kafka "stalls" and we end up cleaning up the data/log folder of 
Kafka and zookeeper and bring it up afresh. We haven't been able to 
narrow down the issue yet.


However, keeping aside that part for a while, we have been noticing 
that even when the system/application is completely idle, the Kafka 
process seems to take up unreasonably high CPU (10-15% constantly 
shown in top command). We have taken multiple thread dumps and each of 
them have this:


"kafka-socket-acceptor" #24 prio=5 os_prio=0 tid=0x7f62685d9000 
nid=0x2d47 runnable [0x7f6231464000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77a458> (a sun.nio.ch.Util$2)
- locked <0xca77a440> (a 
java.util.Collections$UnmodifiableSet)

- locked <0xca774550> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Acceptor.run(SocketServer.scala:215)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-2" #23 prio=5 os_prio=0 
tid=0x7f62685d6800 nid=0x2d46 runnable [0x7f6231565000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77d050> (a sun.nio.ch.Util$2)
- locked <0xca77d038> (a 
java.util.Collections$UnmodifiableSet)

- locked <0xca7745e0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-1" #22 prio=5 os_prio=0 
tid=0x7f62685c7800 nid=0x2d45 runnable [0x7f6231666000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77e590> (a sun.nio.ch.Util$2)
- locked <0xca77e578> (a 
java.util.Collections$UnmodifiableSet)

- locked <0xca7746b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-0" #21 prio=5 os_prio=0 
tid=0x7f62685b9000 nid=0x2d44 runnable [0x7f6231767000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77fbd0> (a sun.nio.ch.Util$2)
- locked <0xca77fbb8> (a 
java.util.Collections$UnmodifiableSet)

- locked <0xca774790> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)




Looking at the code of 0.8.2.1, this piece of code looks like 
https://github.com/apache/kafka/blob/0.8.2.1/core/src/main/scala/kafka/network/SocketServer.scala#L314:


while(isRunning) {
...
val ready = selector.select(300)
...
if(ready > 0) {
...
}
...
}

This looks like a (always) "busy" while loop when selector.select 
returns 0. Could a sleep for a few milli. seconds help in this case? 
Similar code is present in the Acceptor in that same file, which does 
this exact thing. Would adding some small sleep in there help with 
reducing the CPU usage when things are idle?


-Jaikiran






Unreasonably high CPU from Kafka (0.8.2.1)

2015-09-14 Thread Jaikiran Pai
We have been using Kafka for a while now in one of dev projects. 
Currently we have just 1 broker and 1 zookeeper instance. Almost every 
day, Kafka "stalls" and we end up cleaning up the data/log folder of 
Kafka and zookeeper and bring it up afresh. We haven't been able to 
narrow down the issue yet.


However, keeping aside that part for a while, we have been noticing that 
even when the system/application is completely idle, the Kafka process 
seems to take up unreasonably high CPU (10-15% constantly shown in top 
command). We have taken multiple thread dumps and each of them have this:


"kafka-socket-acceptor" #24 prio=5 os_prio=0 tid=0x7f62685d9000 
nid=0x2d47 runnable [0x7f6231464000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77a458> (a sun.nio.ch.Util$2)
- locked <0xca77a440> (a java.util.Collections$UnmodifiableSet)
- locked <0xca774550> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Acceptor.run(SocketServer.scala:215)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-2" #23 prio=5 os_prio=0 
tid=0x7f62685d6800 nid=0x2d46 runnable [0x7f6231565000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77d050> (a sun.nio.ch.Util$2)
- locked <0xca77d038> (a java.util.Collections$UnmodifiableSet)
- locked <0xca7745e0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-1" #22 prio=5 os_prio=0 
tid=0x7f62685c7800 nid=0x2d45 runnable [0x7f6231666000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77e590> (a sun.nio.ch.Util$2)
- locked <0xca77e578> (a java.util.Collections$UnmodifiableSet)
- locked <0xca7746b8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)

"kafka-network-thread-9092-0" #21 prio=5 os_prio=0 
tid=0x7f62685b9000 nid=0x2d44 runnable [0x7f6231767000]

   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0xca77fbd0> (a sun.nio.ch.Util$2)
- locked <0xca77fbb8> (a java.util.Collections$UnmodifiableSet)
- locked <0xca774790> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at kafka.network.Processor.run(SocketServer.scala:320)
at java.lang.Thread.run(Thread.java:745)




Looking at the code of 0.8.2.1, this piece of code looks like 
https://github.com/apache/kafka/blob/0.8.2.1/core/src/main/scala/kafka/network/SocketServer.scala#L314:


while(isRunning) {
...
val ready = selector.select(300)
...
if(ready > 0) {
...
}
...
}

This looks like a (always) "busy" while loop when selector.select 
returns 0. Could a sleep for a few milli. seconds help in this case? 
Similar code is present in the Acceptor in that same file, which does 
this exact thing. Would adding some small sleep in there help with 
reducing the CPU usage when things are idle?


-Jaikiran




Re: Undecipherable error in zookeeper on initial connection

2015-08-14 Thread Jaikiran Pai
Such errors are very typical in zookeeper logs - it's very noisy. I 
typically ignore those errors and try and debug the Kafka issue either 
via Kafka logs, Kafka thread dumps and/or zookeeper shell.


Anyway, how are you adding the topics (script, code?) and what exactly 
are you noticing? Running into exceptions or timing out?


-Jaikiran
On Thursday 13 August 2015 04:06 AM, Jason Kania wrote:

Hello,
I am wondering if someone can point me in the right direction. I am getting 
this error when kafka connects to zookeeper:
zookeeper2_1  | 2015-08-12 22:18:21,493 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /100.100.100.1:38178zookeeper2_1  | 2015-08-12 
22:18:21,498 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@868] - Client 
attempting to establish new session at /172.17.0.239:38178zookeeper2_1  
| 2015-08-12 22:18:21,499 [myid:] - INFO  [SyncThread:0:FileTxnLog@199] - 
Creating new log file: log.6azookeeper_1  | 2015-08-12 22:18:21,505 
[myid:] - INFO  [SyncThread:0:ZooKeeperServer@617] - Established session 
0x14f23fe141a with negotiated timeout 6000 for client 
/100.100.100.1:38178zookeeper_1  | 2015-08-12 22:18:21,755 [myid:] - 
INFO  [ProcessThread(sid:0 cport:-1)::PrepRequestProcessor@645] - Got 
user-level KeeperException when processing sessionid:0x14f23fe141a 
type:delete cxid:0x1b zxid:0x6d txntype:-1 reqpath:n/a Error 
Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for 
/admin/preferred_replica_election
At this point Kafka remains running, I see nothing in the Kafka logs to 
indicate error, but attempts to add topics indicate that no brokers are 
running.  I have tried to look for a solution but zookeeper seems to be a 
really poor application.
Any suggestions would be appreciated.
Thanks,
Jason





Re: Help with SocketTimeoutException while reading from Kafka cluster

2015-08-14 Thread Jaikiran Pai

On Wednesday 12 August 2015 04:59 AM, venkatesh kavuluri wrote:

83799 [c3-onboard_-2-9571-1439334326956-cfa8b46a-leader-finder-thread]
INFO  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
java.net.SocketTimeoutException

163931 [c3-onboard_-2-9571-1439334326956-cfa8b46a-leader-finder-thread]
INFO  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
java.net.SocketTimeoutException


There's a patch in the JIRA here which logs the exact reason why the 
exception was thrown https://issues.apache.org/jira/browse/KAFKA-2221. 
It hasn't been merged since SimpleConsumer was considered deprecated. 
But you might want to apply that and see if that helps to narrow down 
the issue.


-Jaikiran


Re: Got conflicted ephemeral node exception for several hours

2015-08-04 Thread Jaikiran Pai


I am on Kafka 0.8.2.1 (Java 8) and have happened to run into this same 
issue where the KafkaServer (broker) goes into a indefinite while loop 
writing out this message:


[2015-08-04 15:45:12,350] INFO conflict in /brokers/ids/0 data: 
{jmx_port:-1,timestamp:1438661432074,host:foo-bar,version:1,port:9092} 
stored data: 
{jmx_port:-1,timestamp:1438661429589,host:foo-bar,version:1,port:9092} 
(kafka.utils.ZkUtils$)
[2015-08-04 15:45:12,352] INFO I wrote this conflicted ephemeral node 
[{jmx_port:-1,timestamp:1438661432074,host:foo-bar,version:1,port:9092}] 
at /brokers/ids/0 a while back in a different session, hence I will 
backoff for this node to be deleted by Zookeeper and retry 
(kafka.utils.ZkUtils$)


These above 2 lines have been repeating continuously every few seconds 
for the past 20 odd hours on this broker and this broker has been 
rendered useless and is contributing to high CPU usage.


As a result the consumers have gone into a state where they no longer 
consume the messages. Furthermore, this continuous looping has put Kafka 
process on top of the CPU usage. I understand that bouncing the consumer 
is an option and probably will fix this, but in our real production 
environments, we won't be able to bounce the consumers. I currently have 
access to logs (some of which has been pasted here). Is there any chance 
these logs help in narrowing down the issue and fixing the root cause. 
Can we also please add a retry max limit kind of thing in this Zookeeper 
node creation logic instead of going into a indefinite while loop?


I have maintained the original timestamps in the logs so as to help 
narrow down the issue. The 1438661432074 (milli second) in the log 
translates to Aug 03 2015 21:10:32 (PDT) and 1438661429589 translates to 
Aug 03 2015 21:10:30 (PDT). I have included that part of the log snippet 
from the server.log of the broker (10.95.100.31).



[2015-08-03 21:10:29,805] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:29,938] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:30,045] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)

 a lot more similar exceptions 


[2015-08-03 21:10:31,304] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,397] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,399] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,445] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)


 bunch of similar logs as above 

[2015-08-03 21:10:31,784] INFO [ReplicaFetcherManager on broker 0] 
Removed fetcher for partitions [partition list] 
(kafka.server.ReplicaFetcherManager)
[2015-08-03 21:10:31,860] INFO Closing socket connection to 
/10.95.100.31. 

kafka-topics.sh - Include topic deletion information?

2015-07-20 Thread Jaikiran Pai
Would it be possible to enhance the kafka-topics.sh script so that it 
can show, against the topic it's listing, whether a particular topic is 
marked for deletion? Right now, to figure out whether a topic has been 
marked for deletion, one has to use the zookeeper-shell script and list 
the topics under /admin/delete_topics (at least, that's the only way I 
know off). It would be far more easier to just have this info via the 
Kafka scripts.


-Jaikiran


Re: consumer memory footprint

2015-07-16 Thread Jaikiran Pai

On Friday 17 July 2015 10:14 AM, Jiangjie Qin wrote:

I think the rough calculation of max memory footprint for each high level
consumer would be:

(Number Of Partitions For All Topics) * fetch.message.max.bytes *
queued.max.message.chunks + (some decompression memory cost for a message)


Is this Number of Partitions for All Topics in the system or Number 
of partitions for the topics that the consumer is requesting to consume 
from (for example via regex on a topic name filter)?


-Jaikiran





In your case, it would be 10 times above.

Thanks,

Jiangjie (Becket) Qin


On 7/16/15, 1:40 PM, Kris K squareksc...@gmail.com wrote:


Hi All,

Is there a way to calculate the amount of memory used per thread in case
of
a high level consumer?

I am particularly interested in calculating the memory required by a
process running 10 high level consumer threads for 15 topics with max.
file
size set to 100 MB.

Thanks,
Kris




Re: retention.ms is not respected after upgrade to 0.8.2

2015-07-06 Thread Jaikiran Pai
I can't see anything obvious wrong in those configs or the code (after 
just a brief look). Are you sure the system on which you are running 
Kafka has its date/time correctly set?


-Jaikiran
On Monday 29 June 2015 12:06 PM, Krzysztof Zarzycki wrote:

Greetings!
I have problem with Kafka. I had a cluster of 3 brokers in version 0.8.1. I
have a very important topic with raw events, that had a config
retention.ms={365
days in ms} .
It all worked, fine, data was not being deleted.
But now I upgraded all brokers to 0.8.2 and suddenly  brokers delete the
data! They don't respect retention.ms.
I have no other settings around retention set: global log.retention.hours
is set to default 168, log.retention.bytes is not set.

Some more info:
1. I tried looking into ZK config and it looks fine:
$ get /kafka081/config/topics/my_topic
{version:1,config:{retention.ms:3153600}}
cZxid = 0xa0006412c
ctime = Tue Mar 31 15:02:20 CEST 2015
mZxid = 0x116bb7
mtime = Fri Jun 26 22:28:40 CEST 2015
pZxid = 0xa0006412c
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 53
numChildren = 0

2. I tried to overwrite retention.ms once again for my topic. Didn't help.

3. I looked into logs of the broker and found that it indeed deletes the
data, but it doesn't print *why* (based on what rule) it deletes the data:
[2015-06-29 07:35:40,861] INFO Deleting segment 89226232 from log
my_topic-1. (kafka.log.Log)
[2015-06-29 07:35:40,993] INFO Deleting index
/var/lib/kafka/kafka-logs-1/my_topic-1/89226232.index.deleted (k
afka.log.OffsetIndex)


Please help me, I have no idea what to do about it. Any hint on at least
how to debug a problem would be great!

Cheers,
Krzysztof Zarzycki





Re: NoSuchMethodError with Consumer Instantiation

2015-06-17 Thread Jaikiran Pai
You probably have the wrong version of the Kafka jar(s) within your 
classpath. Which version of Kafka are you using and how have you setup 
the classpath?


-Jaikiran
On Thursday 18 June 2015 08:11 AM, Srividhya Anantharamakrishnan wrote:

Hi,

I am trying to set up Kafka in our cluster and I am running into the
following error when Consumer is getting instantiated:

java.lang.NoSuchMethodError:
org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread;

 at
kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610)

 at
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924)

 at
java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590)

 at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333)

 at
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570)

 at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116)

 at
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136)

 at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65)

 at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68)

 at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120)

 at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)


I am guessing that it is missing certain classpath references. If that is
the reason, could someone tell me which jar is it?

If not, what is it that I am missing?


*KafkaConsumer:*


public KafkaConsumer(String topic)

{

* consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
//line where the error is thrown*

  this.topic = topic;

}

  private static ConsumerConfig createConsumerConfig()

{

  Properties props = new Properties();

 props.put(zookeeper.connect, IP:PORT);

 props.put(group.id, group1);

 props.put(zookeeper.session.timeout.ms, 6000);

 props.put(zookeeper.sync.time.ms, 2000);

 props.put(auto.commit.interval.ms, 6);


 return new ConsumerConfig(props);

  }


TIA!





Re: Kafka ConsumerRebalanceFailedException, kafka.consumer.SimpleConsumer: Reconnect due to socket error: null, etc.

2015-05-26 Thread Jaikiran Pai
One way to narrow down the issue is to attach a debugger to the Kafka 
JVM and add a breakpoint in SimpleConsumer to see the real exception 
stacktrace which is causing the reconnect. I've filed a JIRA with a 
patch to improve this logging to include the entire cause stacktrace 
while logging this message https://issues.apache.org/jira/browse/KAFKA-2221


-Jaikiran
On Thursday 21 May 2015 03:36 PM, Kornel wrote:

Hi,

I'm having trouble with a spark (1.2.1) streaming job which is using apache
kafka (client: 0.8.1.1).

The job is consuming messages, and the observed behaviour is that after
processing some messages, the job stops processing, then restarts, then
consumes the lag which build up during the pause and the cycle repeats.

I see plenty of logs, amongst other WARNs like:

kafka.consumer.SimpleConsumer: Reconnect due to socket error: null.

ERRORS of the form:

Deregistered receiver for stream 0: Error starting receiver 0 -
kafka.common.ConsumerRebalanceFailedException:
SparkStreamingJob-com.allegrogroup.reco.rtp.events.item.ItemChangedSparkStreaming_s41696.dc4.local-1432199965502-e860d377
can't rebalance after 4 retries

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)

at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)

at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)

at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)

at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)

kafka.consumer.ZookeeperConsumerConnector: my-streaimng-job can't rebalance
after 4 retries

kafka.common.ConsumerRebalanceFailedException: my-streaimng-job can't
rebalance after 4 retries

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)


kafka.producer.SyncProducer: Producer connection to xxx-broker:9092
unsuccessful

java.nio.channels.ClosedByInterruptException

at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)

at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)

at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)

at kafka.producer.SyncProducer.send(SyncProducer.scala:112)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)

at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Any ideas what the reason might be? The borker logs contain nothing
suspicious. The funny and most important part is, I have another job
reading from the same topic (with a different consumer group) and this job
has no trouble consuming the messages. This would indicate an error in the
streaming job, however, these logs from kafka are prevalent and I can't
find the cause for this.


The method kafka.client.ClientUtils#fetchTopicMetadata is called in the
failing job quite often, the working one does not call it (I base my
assumption on the presence of Fetching metadata from broker... mesages).
What could be the reason for that?



Thanks for any hints,

  Kornel





Re: Log file of server start up error

2015-05-24 Thread Jaikiran Pai

Hi Sanjay,

Did you check that no other Kafka process is using the /tmp/kafk-logs 
folder? What command(s) did you use to verify that?


-Jaikiran
On Saturday 23 May 2015 12:19 PM, Sanjay Mistry wrote:

[2015-05-23 12:16:41,624] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@70808f4e
(org.apache.zookeeper.ZooKeeper)
[2015-05-23 12:16:41,659] INFO Opening socket connection to server
localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,673] INFO Socket connection established to
localhost/0:0:0:0:0:0:0:1:2181, initiating session
(org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,740] INFO Session establishment complete on server
localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14d7f85f7a3, negotiated
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:41,743] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
[2015-05-23 12:16:42,015] FATAL Fatal error during KafkaServerStable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.KafkaException: Failed to acquire lock on file .lock in
/tmp/kafka-logs. A Kafka instance in another process or thread is using
this directory.
 at
kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95)
 at
kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:92)
 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33)
 at
scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:33)
 at kafka.log.LogManager.lockLogDirs(LogManager.scala:92)
 at kafka.log.LogManager.init(LogManager.scala:55)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
[2015-05-23 12:16:42,023] INFO [Kafka Server 0], shutting down
(kafka.server.KafkaServer)
[2015-05-23 12:16:42,030] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2015-05-23 12:16:42,036] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)
[2015-05-23 12:16:42,037] INFO Session: 0x14d7f85f7a3 closed
(org.apache.zookeeper.ZooKeeper)
[2015-05-23 12:16:42,038] INFO [Kafka Server 0], shut down completed
(kafka.server.KafkaServer)
[2015-05-23 12:16:42,040] INFO [Kafka Server 0], shutting down
(kafka.server.KafkaServer)





Re: .deleted file descriptors

2015-03-01 Thread Jaikiran Pai
One thing to remember is that the .index files are memory-mapped [1] 
which in Java means that the file descriptors may not be released even 
when the program is done using it. A garbage collection is expected to 
close such resources, but forcing a System.gc() is only a hint and thus 
doesn't guarantee that it will trigger the garbage collection and close 
that resource. More details in [2]. In order to confirm that this is 
what you are running into, can you print the output of the following:


lsof -p broker-pid | grep .deleted

Before running that command do wait for at least the retention timeout 
and file deletion delay to have actually passed. The interesting part in 
that output would be the type column. For example, in my case it shows 
DEL as the value for that column:


java7518 jaikiran  DELREG8,6 1453306 
/tmp/kafka-logs/hey-0/.index.deleted


when the index file has really been deleted but the JVM hasn't yet let 
go off the file descriptor resource. I did try to run a gc() from the 
jconsole MBean for that process, but that too didn't free this resource, 
but I didn't really expect it to, since there's not much control on GC 
itself.


By the way which exact vendor and version of Java do you use? [3] 
suggests that an update in Oracle Java 1.6 has a workaround to this 
problem, but that workaround only comes into picture when a subsequent 
FileChannel.map calls results in a OOM.


[1] http://docs.oracle.com/javase/7/docs/api/java/nio/MappedByteBuffer.html
[2] http://bugs.java.com/view_bug.do?bug_id=4724038
[3] http://bugs.java.com/view_bug.do?bug_id=6417205

-Jaikiran

On Monday 02 March 2015 10:30 AM, Guangle Fan wrote:

Slightly different from what I observed.

Broker box has 800GB disk space. By setting the appropriate log retention,
it's supposed to hold the log size. But then the usage of disk hits 90%,
and by doing nothing but restarting broker server. It free 40% disk space.
It's for sure the speed of the traffic won't be able to fill 40% disk space
within one minute period (log.delete.delay.ms).

The obvious change before and after restart broker server is broker server
frees tons of file descriptors of .index. Most of those file descriptors
are very old.

lsof -p  broker_pid | grep .deleted

Don't know how come kafka didn't release those file descriptors and what's
the dial of it.








On Sun, Mar 1, 2015 at 12:50 PM, Mayuresh Gharat gharatmayures...@gmail.com

wrote:
Also I suppose when the broker starts up it will remove the files that are
marked with suffix .deleted and that's why you can see the free disk space
on restarting. Guozhang can correct me if I am wrong.

Thanks,

Mayuresh

On Sat, Feb 28, 2015 at 9:27 PM, Guozhang Wang wangg...@gmail.com wrote:


Guangle,

The deletion of the segment log / index files are async, i.e. when Kafka
decide to clean the logs, it only adds a suffix .deleted to the files
such that it will not be access any more by other Kafka threads. The

actual

file deletion will be executed later, with period controlled by 
file.delete.delay.ms (default 1 minute).

On Fri, Feb 27, 2015 at 9:49 PM, Guangle Fan fanguan...@gmail.com

wrote:

Hi,

After Kafka cleaned .log / .index files based on topic retention. I can
still lsof a lot of .index.deleted files. And df shows usage on disk

space

is accumulated to full.

When this happened, just by restarting broker, it will immediately free
those disk space. I seems to me kafka after cleaning expired files

still

hold file descriptors which lead to disk space still being held.

How do you config kafka to let kafka release file descriptors in this

case

?

Using kafka 0.8.1.1

Regards,

Guangle




--
-- Guozhang




--
-Regards,
Mayuresh R. Gharat
(862) 250-7125





Re: Kafka producer perf script throw java.io.IOException

2015-02-04 Thread Jaikiran Pai
 java.io.IOException: Unable to create 
/tmp/PerfTopic22_1/ProducerRequestSize.csv


It looks like a file with that exact same name already exists which is 
causing that file creation request to fail. This indicates that probably 
the metric name (ProducerRequestSize) from which the file is created, is 
duplicate for whatever reason.


-Jaikiran

On Thursday 05 February 2015 12:49 PM, Xinyi Su wrote:

Hi,

I need to get more metrics from csv reporter.  If turn off csv-reporter,
few output is shown.

Thanks.
Xinyi

On 5 February 2015 at 13:09, tao xiao xiaotao...@gmail.com wrote:


Hi,

In order to get it work you can turn off csv-reporter.

On Thu, Feb 5, 2015 at 1:06 PM, Xinyi Su xiny...@gmail.com wrote:


Hi,

Today I updated Kafka cluster from 0.8.2-beta to 0.8.2.0 and run kafka
producer performance test.

The test cannot continue because of some exceptions thrown which does not
occur at 0.8.2-beta. My perf library is kafka-perf_2.9.2-0.8.0.jar which

is

the latest version on maven repository.

-bash-4.1$ bin/kafka-producer-perf-test.sh   --broker-list broker list
--topics PerfTopic22 --sync --initial-message-id 1 --messages 20
--csv-reporter-enabled --metrics-dir /tmp/PerfTopic22_1
--message-send-gap-ms 20 --request-num-acks -1 --batch-size 1

java.io.IOException: Unable to create
/tmp/PerfTopic22_1/ProducerRequestSize.csv
at



com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141)

at



com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257)

at

com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)

at


com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156)

at



com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:194)

at



com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:22)

at com.yammer.metrics.core.Histogram.processWith(Histogram.java:231)
at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)
at

java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at



java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)

at



java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at



java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at



java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)




--
Regards,
Tao





Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jaikiran Pai

On Monday 02 February 2015 11:03 PM, Jun Rao wrote:

Jaikiran,

The fix you provided in probably unnecessary. The channel that we use in
SimpleConsumer (BlockingChannel) is configured to be blocking. So even
though the read from the socket is in a loop, each read blocks if there is
no bytes received from the broker. So, that shouldn't cause extra CPU
consumption.

Hi Jun,

Of course, you are right! I forgot that while reading the thread dump in 
hprof output, one has to be aware that the thread state isn't shown and 
the thread need not necessarily be doing any CPU activity.


-Jaikiran




Thanks,

Jun

On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:


Hi Neha,

I sent an e-mail earlier today, but noticed now that it didn't actually go
through.

Anyhow, I've attached two files, one with output from a 10 minute run and
one with output from a 30 minute run. Realized that maybe I should've done
one or two runs with 0.8.1.1 as well, but nevertheless.

I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same CPU
usage as with the beta version (basically pegging all cores). If I manage
to find the time I'll do another run with hprof on the rc2 version later
today.

Best regards,
Mathias

On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede n...@confluent.io wrote:


The following should be sufficient

java
-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
y,thread=y,file=kafka.hprof
classname

You would need to start the Kafka server with the settings above for
sometime until you observe the problem.

On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:


Hi Neha,

Yeah sure. I'm not familiar with hprof, so any particular options I

should

include or just run with defaults?

Best regards,
Mathias

On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede n...@confluent.io

wrote:

Thanks for reporting the issue. Would you mind running hprof and

sending

the output?

On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:


Good day,

I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed

that

the CPU usage on the broker machines went up by roughly 40%, from

~60%

to

~100% and am wondering if anyone else has experienced something

similar?

The load average also went up by 2x-3x.

We're running on EC2 and the cluster currently consists of four

m1.xlarge,

with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65

to

be

exact) and Scala 2.9.2. Configurations can be found over here:
https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.

I'm assuming that this is not expected behaviour for 0.8.2-beta?

Best regards,
Mathias




--
Thanks,
Neha




--
Thanks,
Neha





Re: question on the mailing list

2015-02-01 Thread Jaikiran Pai
I use a simple email client (Thunderbird) and have a filter setup so 
that mails to the Kafka user mailing list are moved to a specific 
folder. I then have thread view enabled so that the replies/discussion 
shows up in the right context. I have the same for some other mailing 
lists too and haven't felt the need for any other tool.


-Jaikiran
On Wednesday 28 January 2015 11:01 PM, Dillian Murphey wrote:

Hi all,

Sorry for asking, but is there some easier way to use the mailing list?
Maybe a tool which makes reading and replying to messages more like google
groups?  I like the hadoop searcher, but the UI on that is really bad.

tnx





Re: Increased CPU usage with 0.8.2-beta

2015-02-01 Thread Jaikiran Pai

Hi Mathias,

Looking at that thread dump, I think the potential culprit is this one:

TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


I see many such threads all triggered through the SimpleConsumer and 
ending up polling. Looking at the code, in theory, I can see why there 
might be a busy CPU loop generated by that code path. If my guess is 
right, it could be because of an issue in the implementation of how data 
is read off a channel in a blocking manner and I think this patch might 
help overcome that problem:


diff --git a/core/src/main/scala/kafka/network/Transmission.scala 
b/core/src/main/scala/kafka/network/Transmission.scala

index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
 var totalRead = 0
 while(!complete) {
   val read = readFrom(channel)
-  trace(read +  bytes read.)
-  totalRead += read
+  if (read  0) {
+trace(read +  bytes read.)
+totalRead += read
+  } else if (read == 0) {
+// it's possible that nothing was read (see javadoc of 
ReadableByteChannel#read), from the backing channel,
+// so we wait for a while before polling again, so that we 
don't end up with a busy CPU loop

+// TODO: For now, this 30 milli seconds is a random value.
+Thread.sleep(30)
+  }
 }
 totalRead
   }

Is this something that you would be able to apply against the latest 
0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it 
improves the situation?


-Jaikiran

On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:

Hi Neha,

I sent an e-mail earlier today, but noticed now that it didn't 
actually go through.


Anyhow, I've attached two files, one with output from a 10 minute run 
and one with output from a 30 minute run. Realized that maybe I 
should've done one or two runs with 0.8.1.1 as well, but nevertheless.


I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same 
CPU usage as with the beta version (basically pegging all cores). If I 
manage to find the time I'll do another run with hprof on the rc2 
version later today.


Best regards,
Mathias

On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede n...@confluent.io 
mailto:n...@confluent.io wrote:


The following should be sufficient

java

-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof
classname

You would need to start the Kafka server with the settings above for
sometime until you observe the problem.

On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg 
mathias.soederb...@gmail.com
mailto:mathias.soederb...@gmail.com wrote:

 Hi Neha,

 Yeah sure. I'm not familiar with hprof, so any particular
options I should
 include or just run with defaults?

 Best regards,
 Mathias

 On Mon Dec 08 2014 at 7:41:32 

Re: kafka deleted old logs but not released

2015-01-31 Thread Jaikiran Pai


Hi Yonghui,

I have tried a few ways with different retention strategies to try and 
reproduce this issue, but haven't been able to do it. Since it looks 
like you can consistently reproduce this, would you able to share a 
sample reproducible application (maybe as a github repo) for this issue?


-Jaikiran
On Monday 26 January 2015 09:27 AM, Yonghui Zhao wrote:

I have fixed this issue like this patch
https://reviews.apache.org/r/29755/diff/5/.

I find rename failure still happens:

server.log.2015-01-26-06:[2015-01-26 06:10:54,513] ERROR File rename
failed, forcefully deleting file (kafka.log.Log)
server.log.2015-01-26-06:[2015-01-26 06:10:54,600] ERROR File rename
failed, forcefully deleting file (kafka.log.Log)
server.log.2015-01-26-06:[2015-01-26 06:10:54,685] ERROR File rename
failed, forcefully deleting file (kafka.log.Log)
server.log.2015-01-26-06:[2015-01-26 06:10:54,797] ERROR File rename
failed, forcefully deleting file (kafka.log.Log)


And use lsof I can still find some files opened by kafka but deleted,  but
those files sizes are 0.

java   3228 root   34uw REG  253,2 0   26084228
/home/work/data/soft/kafka-0.8/data/.lock (deleted)
java   3228 root   35u  REG  253,2 0   26084232
/home/work/data/soft/kafka-0.8/data/cube-0/.log
(deleted)
java   3228 root   36u  REG  253,2 0   26869778
/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_misearch_appstore-search-0/3116.log
(deleted)
java   3228 root   37u  REG  253,2 0   26084234
/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_mishop-search_mishop_v1-0/.log
(deleted)



Here is my configuration:

Binary: kafka_2.10-0.8.1.1
Retention config:

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.retention.check.interval.ms=6

# By default the log cleaner is disabled and the log retention policy will
default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and
individual logs can then be marked for log compaction.
log.cleaner.enable=false



OS:  CentOS release 6.4 (Final)
JDK:
*java version 1.6.0_37*
Java(TM) SE Runtime Environment (build 1.6.0_37-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.12-b01, mixed mode)

JDK is too old, but not sure if this result in rename failure.






2015-01-26 0:42 GMT+08:00 Jay Kreps jay.kr...@gmail.com:


Also, what is the configuration for the servers? In particular it would be
good to know the retention and/or log compaction settings as those delete
files.

-Jay

On Sun, Jan 25, 2015 at 4:34 AM, Jaikiran Pai jai.forums2...@gmail.com
wrote:


Hi Yonghui,

Do you still have this happening? If yes, can you tell us a bit more
about your setup? Is there something else that accesses or maybe deleting
these log files? For more context to this question, please read the
discussion related to this here http://mail-archives.apache.
org/mod_mbox/kafka-dev/201501.mbox/%3C54C47E9B.5060401%40gmail.com%3E


-Jaikiran



On Thursday 08 January 2015 11:19 AM, Yonghui Zhao wrote:


CentOS release 6.3 (Final)


2015-01-07 22:18 GMT+08:00 Harsha ka...@harsha.io:

  Yonghui,

 Which OS you are running.
-Harsha

On Wed, Jan 7, 2015, at 01:38 AM, Yonghui Zhao wrote:


Yes  and I found the reason rename in deletion is failed.
In rename progress the files is deleted? and then exception blocks
file
closed in kafka.
But I don't know how can rename failure happen,

[2015-01-07 00:10:48,685] ERROR Uncaught exception in scheduled task
'kafka-log-retention' (kafka.utils.KafkaScheduler)
kafka.common.KafkaStorageException: Failed to change the log file
suffix
from  to .deleted for log segment 70781650
  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.
scala:249)
  at kafka.log.Log.kafka$log$Log$$
asyncDeleteSegment(Log.scala:636)
  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:627)
  at
kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
  at
kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at kafka.log.Log.deleteOldSegments(Log.scala:415)
  at

  
kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:325)



  at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:356

Re: Potential socket leak in kafka sync producer

2015-01-30 Thread Jaikiran Pai

Hi Ankit,

Would you be able to share the trimmed down application code which 
reproduces this issue (maybe as a repo on github)? That way, some of us 
will get more context about the issue and probably be able to try it out 
ourselves and see what's wrong.


On a related note, have you tried this against a later version (like the 
0.8.2.0 CR3) and see if it's still reproducible there?


-Jaikiran
On Friday 30 January 2015 02:59 PM, ankit tyagi wrote:

  Jaikiran,

I have already investigated that this is kafka related. I made a small
application which is used only for publishing messages to kafka. If I use
dynamic thread pool means where number of maxPoolSize  is very large
comparative to corePoolSize and I publish each batch of messages only after
all threads gets destroyed after keepAliveSeconds then FD leak problem
occurs . I suspect when threads gets destroyed some how file handlers is
not getting cleared . So when I trigger explict GC, descriptor count get
reduced by signifact amont because of clean up of those destroyed thread.


we got this problem in our production box where soft and hard limit of file
descriptor was 5 but for reproducing this issue on my local machine i
have reduced hard limit to 6000 and used 1000 threads to send message to
  kafka (topic had 100 partition with 1 replication factor)





On Fri, Jan 30, 2015 at 2:14 PM, Jaikiran Pai jai.forums2...@gmail.com
wrote:


Looking at that heap dump, this probably is a database connection/resource
leak (298 connections?) than anything to do with Kafka. Have you
investigated if there's any DB resource leak in the application and ruled
out that part?

-Jaikiran


On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:


I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai jai.forums2...@gmail.com
wrote:

  What kind of a (managed) component is that which has the @PreDestroy?

Looking at the previous snippet you added, it looks like you are creating
the Producer in some method? If  you are going to close the producer in a
@PreDestroy of the component, then you should be creating the producer in
the @PostConstruct of the same component, so that you have proper
lifecycle
management of those resources.


-Jaikiran

On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:

  Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
   public void stop()
   {
   LOG.info(Stopping Kafka Producer for topic: {}, myTopic);
   if (myProducer != null) {
   myProducer.close();
   }
   }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

   Hope you are closing the producers. can you share the attachment
through


gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi 
ankittyagi.mn...@gmail.com
wrote:

   Hi Jaikiran,


I am using ubuntu and was able to reproduce on redhat too. Please find

  the

  more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION=Ubuntu 12.04.5 LTS*

*java version 1.7.0_72*

This is happening on client side. Output of lsof was showing that
maximum
fd were FIFO and anon. But after GC FD count was reduced
significantly.

Below is my Client Code which i am using for publishing message.


* private ProducerKafkaPartitionKey, KafkaEventWrapper myProducer;*

* myProducer =new Producer(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*ListKeyedMessageKafkaPartitionKey, KafkaEventWrapper
msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before

  GC(histo_1)

  and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai 
jai.forums2...@gmail.com
wrote:

   Which operating system are you on and what Java version? Depending
on
the
OS, you could get tools (like lsof) to show which file descriptors are


being held on to. Is it the client JVM which ends up with these
leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

   Hi,


Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while

  publishing

kafka message


*[2015-01-29

13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

  Fetching

topic metadata with correlation id 10808 for topics [Set(*


*kafka_topic_coms_FD_test1)] from broker

  [id:0,host:localhost,port:9092]

failed*


*java.net.ConnectException: Connection refused*

*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java

Re: Potential socket leak in kafka sync producer

2015-01-30 Thread Jaikiran Pai
Looking at that heap dump, this probably is a database 
connection/resource leak (298 connections?) than anything to do with 
Kafka. Have you investigated if there's any DB resource leak in the 
application and ruled out that part?


-Jaikiran

On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:

I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai jai.forums2...@gmail.com
wrote:


What kind of a (managed) component is that which has the @PreDestroy?
Looking at the previous snippet you added, it looks like you are creating
the Producer in some method? If  you are going to close the producer in a
@PreDestroy of the component, then you should be creating the producer in
the @PostConstruct of the same component, so that you have proper lifecycle
management of those resources.


-Jaikiran

On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:


Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
  public void stop()
  {
  LOG.info(Stopping Kafka Producer for topic: {}, myTopic);
  if (myProducer != null) {
  myProducer.close();
  }
  }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

  Hope you are closing the producers. can you share the attachment through

gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi 
ankittyagi.mn...@gmail.com
wrote:

  Hi Jaikiran,

I am using ubuntu and was able to reproduce on redhat too. Please find


the


more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION=Ubuntu 12.04.5 LTS*

*java version 1.7.0_72*

This is happening on client side. Output of lsof was showing that
maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private ProducerKafkaPartitionKey, KafkaEventWrapper myProducer;*

* myProducer =new Producer(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*ListKeyedMessageKafkaPartitionKey, KafkaEventWrapper
msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before


GC(histo_1)


and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai jai.forums2...@gmail.com
wrote:

  Which operating system are you on and what Java version? Depending on
the
OS, you could get tools (like lsof) to show which file descriptors are

being held on to. Is it the client JVM which ends up with these leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

  Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while


publishing

kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]


Fetching
topic metadata with correlation id 10808 for topics [Set(*

*kafka_topic_coms_FD_test1)] from broker


[id:0,host:localhost,port:9092]

failed*

*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
   at


kafka.network.BlockingChannel.connect(BlockingChannel.scala:

57)

   at


kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

   at

  kafka.producer.SyncProducer.getOrMakeConnection(

SyncProducer.scala:156)


   at

kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:68)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
   at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
   at
kafka.producer.BrokerPartitionInfo.updateInfo(
BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor


gets
destroyed, somehow file descriptor is not getting cleared but when i
did

explicitly ran the full gc, fd count got reduced by a signification

amout.




Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Jaikiran Pai


Which operating system are you on and what Java version? Depending on 
the OS, you could get tools (like lsof) to show which file descriptors 
are being held on to. Is it the client JVM which ends up with these leaks?


Also, would it be possible to post a snippet of your application code 
which shows how you are using the Kafka APIs?


-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:

Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while publishing
kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89] Fetching
topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
failed*
*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
 at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
 at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
 at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
 at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
 at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
 at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
 at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor gets
destroyed, somehow file descriptor is not getting cleared but when i did
explicitly ran the full gc, fd count got reduced by a signification amout.





Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Jaikiran Pai
What kind of a (managed) component is that which has the @PreDestroy? 
Looking at the previous snippet you added, it looks like you are 
creating the Producer in some method? If  you are going to close the 
producer in a @PreDestroy of the component, then you should be creating 
the producer in the @PostConstruct of the same component, so that you 
have proper lifecycle management of those resources.



-Jaikiran
On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:

Hi,

I am closing my producer at the time of shutting down my application.

@PreDestroy
 public void stop()
 {
 LOG.info(Stopping Kafka Producer for topic: {}, myTopic);
 if (myProducer != null) {
 myProducer.close();
 }
 }



On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:


Hope you are closing the producers. can you share the attachment through
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:


Hi Jaikiran,

I am using ubuntu and was able to reproduce on redhat too. Please find

the

more information below.


*DISTRIB_ID=Ubuntu*
*DISTRIB_RELEASE=12.04*
*DISTRIB_CODENAME=precise*
*DISTRIB_DESCRIPTION=Ubuntu 12.04.5 LTS*

*java version 1.7.0_72*

This is happening on client side. Output of lsof was showing that maximum
fd were FIFO and anon. But after GC FD count was reduced significantly.

Below is my Client Code which i am using for publishing message.


* private ProducerKafkaPartitionKey, KafkaEventWrapper myProducer;*

* myProducer =new Producer(new
ProducerConfig(myProducerProperties));*

*   public void send(*
*ListKeyedMessageKafkaPartitionKey, KafkaEventWrapper msgs)*
*{*
*myProducer.send(msgs);*
*}*


we are using sync producer. I am attaching object histo before

GC(histo_1)

and after GC(histo_2) in my application.

On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai jai.forums2...@gmail.com
wrote:


Which operating system are you on and what Java version? Depending on

the

OS, you could get tools (like lsof) to show which file descriptors are
being held on to. Is it the client JVM which ends up with these leaks?

Also, would it be possible to post a snippet of your application code
which shows how you are using the Kafka APIs?

-Jaikiran
On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:


Hi,

Currently we are using sync producer client of 0.8.1 version in our
production box . we are getting the following exception while

publishing

kafka message

*[2015-01-29
13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]

Fetching

topic metadata with correlation id 10808 for topics [Set(*
*kafka_topic_coms_FD_test1)] from broker

[id:0,host:localhost,port:9092]

failed*
*java.net.ConnectException: Connection refused*
*at sun.nio.ch.Net.connect0(Native Method)*
*at sun.nio.ch.Net.connect(Net.java:465)*
*at sun.nio.ch.Net.connect(Net.java:457)*
*at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
  at

kafka.network.BlockingChannel.connect(BlockingChannel.scala:

57)
  at

kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

  at


kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)

  at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
doSend(SyncProducer.scala:68)
  at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
  at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
  at
kafka.producer.BrokerPartitionInfo.updateInfo(
BrokerPartitionInfo.scala:82)


we are using dynamic thread pool to publish message to kafka. My
observation is when after keep alive time when threads in my executor

gets

destroyed, somehow file descriptor is not getting cleared but when i

did

explicitly ran the full gc, fd count got reduced by a signification

amout.






Re: kafka deleted old logs but not released

2015-01-25 Thread Jaikiran Pai

Hi Yonghui,

Do you still have this happening? If yes, can you tell us a bit more 
about your setup? Is there something else that accesses or maybe 
deleting these log files? For more context to this question, please read 
the discussion related to this here 
http://mail-archives.apache.org/mod_mbox/kafka-dev/201501.mbox/%3C54C47E9B.5060401%40gmail.com%3E


-Jaikiran



On Thursday 08 January 2015 11:19 AM, Yonghui Zhao wrote:

CentOS release 6.3 (Final)


2015-01-07 22:18 GMT+08:00 Harsha ka...@harsha.io:


Yonghui,
Which OS you are running.
-Harsha

On Wed, Jan 7, 2015, at 01:38 AM, Yonghui Zhao wrote:

Yes  and I found the reason rename in deletion is failed.
In rename progress the files is deleted? and then exception blocks 
file

closed in kafka.
But I don't know how can rename failure happen,

[2015-01-07 00:10:48,685] ERROR Uncaught exception in scheduled task
'kafka-log-retention' (kafka.utils.KafkaScheduler)
kafka.common.KafkaStorageException: Failed to change the log file 
suffix

from  to .deleted for log segment 70781650
 at 
kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
 at 
kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:636)

 at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:627)
 at
kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
 at
kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at kafka.log.Log.deleteOldSegments(Log.scala:415)
 at

kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:325) 


 at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:356) 


 at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:354) 


 at

scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 

 at 
scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at

scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 


 at kafka.log.LogManager.cleanupLogs(LogManager.scala:354)
 at

kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:141) 


 at
kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) 


 at

java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) 


 at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at

java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) 


 at

java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) 


 at

java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) 


 at

java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 


 at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 


 at java.lang.Thread.run(Thread.java:662)


2015-01-07 13:56 GMT+08:00 Jun Rao j...@confluent.io:


Do you mean that the Kafka broker still holds a file handler on a

deleted

file? Do you see those files being deleted in the Kafka log4j log?

Thanks,

Jun

On Tue, Jan 6, 2015 at 4:46 AM, Yonghui Zhao zhaoyong...@gmail.com
wrote:


Hi,

We use kafka_2.10-0.8.1.1 in our server. Today we found disk space

alert.
We find many kafka data files are deleted, but still opened by 
kafka.


such as:

_yellowpageV2-0/68170670.log (deleted)
java   8446 root  724u  REG 253,2

536937911

26087362


/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log 


(deleted)
java   8446 root  725u  REG 253,2

536910838

26087364


/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log 


(deleted)
java   8446 root  726u  REG 253,2

536917902

26087368


/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/70104914.log 


(deleted)


Is there anything wrong or wrong configed?







Re: dumping JMX data

2015-01-20 Thread Jaikiran Pai
Just had a quick look at this and it turns out the object name you are 
passing is incorrect. I had to change it to:


./kafka-run-class.sh kafka.tools.JmxTool --object-name 
'kafka.server:name=UnderReplicadPartitions,type=ReplicaManager' 
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi


to get it to show the attribute values of that MBean.

I think, maybe, the JmxTool could be enhanced to print out a message if 
any of the passed object names don't have a corresponding registered 
MBean. That way, you know what the problem is.


-Jaikiran


On Sunday 18 January 2015 06:49 AM, Scott Chapman wrote:

So, related question.

If I query for a specific object name, I always seem to get UNIX time:
./bin/kafka-run-class.sh kafka.tools.JmxTool --object-name
'kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager'
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi

always returns:
1421543777895
1421543779895
1421543781895
1421543783896
1421543785896

What am I missing?

On Sat Jan 17 2015 at 8:11:38 PM Scott Chapman sc...@woofplanet.com wrote:


Thanks, that second one might be material. I find that if I run without
any arguments I get no output and it just keeps running. *sigh*

On Sat Jan 17 2015 at 7:58:52 PM Manikumar Reddy ku...@nmsworks.co.in
wrote:


JIRAs related to the issue are

https://issues.apache.org/jira/browse/KAFKA-1680
https://issues.apache.org/jira/browse/KAFKA-1679

On Sun, Jan 18, 2015 at 3:12 AM, Scott Chapman sc...@woofplanet.com
wrote:


While I appreciate all the suggestions on other JMX related tools, my
question is really about the JMXTool included in and documented in Kafka
and how to use it to dump all the JMX data. I can get it to dump some
mbeans, so i know my config is working. But what I can't seem to do

(which

is described in the documentation) is to dump all attributes of all
objects.

Please, anyone using it have any experience it that might be able to

help

me?

Thanks in advance!

On Sat Jan 17 2015 at 12:39:56 PM Albert Strasheim full...@gmail.com
wrote:


On Fri, Jan 16, 2015 at 5:52 PM, Joe Stein joe.st...@stealth.ly

wrote:

Here are some more tools for that
https://cwiki.apache.org/confluence/display/KAFKA/JMX+Reporters

depending

on what you have in place and what you are trying todo different

options

exist.

A lot of folks like JMX Trans.

We tried JMX Trans for a while, but didn't like it very much.

Jolokia looks promising. Trying that now.

http://www.jolokia.org/





Re: java.nio.channels.ClosedChannelException...Firewall Issue?

2015-01-19 Thread Jaikiran Pai

Hi Su,

How exactly did you start the Kafka server on instance A? Are you sure 
the services on it are bound to non localhost IP? What does the 
following command result from instance B:


telnet public.ip.of.A 9092


-Jaikiran
On Tuesday 20 January 2015 07:16 AM, Su She wrote:

Hello Everyone,

Thank you for the help!

Preface: I've created producers/consumers before and they have worked. I
have also made consumers/producers using java programs, but they have all
been locally.

1) I have a Zookeeper/Kafka Server running on an EC2 instance called A

2) I started the Zookeeper/Kafka Server on A and created a topic test
like it says on the kafka documentation

3) I then started a console consumer on A like the documentation states for
test.

4) I then downloaded Kafka on EC2 instance called B and created a console
producer with broker with broker-list public.ip.of.A:9092 and topic test.

5) Once I start publishing messages this is the message I get (what's worse
is that I can't cntrl C to stop and have to exit putty):

WARN Fetching topic metadata with correlation id 16 for topics [Set(test)]
from broker [id:0,host:54.183.40.224,port:9092] failed
(kafka.client.ClientUtils$)

java.nio.channels.ClosedChannelException

 at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)

 at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)

 at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)

 at kafka.producer.SyncProducer.send(SyncProducer.scala:114)

 at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

 at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

 at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)

 at kafka.utils.Utils$.swallow(Utils.scala:172)

 at kafka.utils.Logging$class.swallowError(Logging.scala:106)

 at kafka.utils.Utils$.swallowError(Utils.scala:45)

 at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)

 at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

 at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93)

 at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

[2015-01-20 01:29:28,289] ERROR fetching topic metadata for topics
[Set(test)] from broker [ArrayBuffer(id:0,host:

Meanwhile in ZK...not sure if this happened as soon as I started publishing
messages:

Error:KeeperErrorCode = NoNode for /consumers/console-consumer-2615/offsets
(org.apache.zookeeper.server.PrepRequestProcessor)

Any tips/suggestions are greatly appreciated!





Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-15 Thread Jaikiran Pai
I just downloaded the Kafka binary and am trying this on my 32 bit JVM 
(Java 7)? Trying to start Zookeeper or Kafka server keeps failing with 
Unrecognized VM option 'UseCompressedOops':


./zookeeper-server-start.sh ../config/zookeeper.properties
Unrecognized VM option 'UseCompressedOops'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

Same with the Kafka server startup scripts. My Java version is:

java version 1.7.0_71
Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
Java HotSpot(TM) Server VM (build 24.71-b01, mixed mode)

Should there be a check in the script, before adding this option?

-Jaikiran

On Wednesday 14 January 2015 10:08 PM, Jun Rao wrote:

+ users mailing list. It would be great if people can test this out and
report any blocker issues.

Thanks,

Jun

On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote:


This is the first candidate for release of Apache Kafka 0.8.2.0. There
has been some changes since the 0.8.2 beta release, especially in the new
java producer api and jmx mbean names. It would be great if people can test
this out thoroughly. We are giving people 10 days for testing and voting.

Release Notes for the 0.8.2.0 release
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html*

*** Please download, test and vote by Friday, Jan 23h, 7pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS* in
addition to the md5, sha1
and sha2 (SHA256) checksum.

* Release artifacts to be voted upon (source and binary):
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/*

* Maven artifacts to be voted upon prior to release:
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/maven_staging/
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/maven_staging/*

* scala-doc
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package*

* java-doc
*https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/*

* The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
*https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=b0c7d579f8aeb5750573008040a42b7377a651d5
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=b0c7d579f8aeb5750573008040a42b7377a651d5*

/***

Thanks,

Jun





Re: kafka deleted old logs but not released

2015-01-07 Thread Jaikiran Pai
Apart from the fact that the file rename is failing (the API notes that 
there are chances of the rename failing), it looks like the 
implementation in FileMessageSet's rename can cause a couple of issues, 
one of them being a leak.


The implementation looks like this 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/FileMessageSet.scala#L268. 
Notice that the reference to the original file member variable is 
switched with a new one but the (old) FileChannel held in that 
FileMessageSet isn't closed. That I think explains the leak. 
Furthermore, a new fileChannel for the new File instance isn't being 
created either and that's a different issue.


P.S: Not much familiar with Kafka code yet. The above explanation is 
just based on a quick look at that piece of code and doesn't take into 
account any other context there might be to this.


-Jaikiran


On Thursday 08 January 2015 11:19 AM, Yonghui Zhao wrote:

CentOS release 6.3 (Final)


2015-01-07 22:18 GMT+08:00 Harsha ka...@harsha.io:


Yonghui,
Which OS you are running.
-Harsha

On Wed, Jan 7, 2015, at 01:38 AM, Yonghui Zhao wrote:

Yes  and I found the reason rename in deletion is failed.
In rename progress the files is deleted? and then exception blocks file
closed in kafka.
But I don't know how can rename failure happen,

[2015-01-07 00:10:48,685] ERROR Uncaught exception in scheduled task
'kafka-log-retention' (kafka.utils.KafkaScheduler)
kafka.common.KafkaStorageException: Failed to change the log file suffix
from  to .deleted for log segment 70781650
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:636)
 at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:627)
 at
 kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
 at
 kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:415)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at kafka.log.Log.deleteOldSegments(Log.scala:415)
 at


kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:325)

 at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:356)
 at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:354)
 at


scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at


scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at kafka.log.LogManager.cleanupLogs(LogManager.scala:354)
 at


kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:141)

 at
 kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
 at


java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)

 at
 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at


java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)

 at


java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)

 at


java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)

 at


java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 at


java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

 at java.lang.Thread.run(Thread.java:662)


2015-01-07 13:56 GMT+08:00 Jun Rao j...@confluent.io:


Do you mean that the Kafka broker still holds a file handler on a

deleted

file? Do you see those files being deleted in the Kafka log4j log?

Thanks,

Jun

On Tue, Jan 6, 2015 at 4:46 AM, Yonghui Zhao zhaoyong...@gmail.com
wrote:


Hi,

We use kafka_2.10-0.8.1.1 in our server. Today we found disk space

alert.

We find many kafka data files are deleted, but still opened by kafka.

such as:

_yellowpageV2-0/68170670.log (deleted)
java   8446 root  724u  REG  253,2

536937911

26087362



/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log

(deleted)
java   8446 root  725u  REG  253,2

536910838

26087364



/home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log

(deleted)
java   8446 root  726u  REG  253,2

536917902


Re: NotLeaderForPartitionException while doing performance test

2015-01-07 Thread Jaikiran Pai

On Thursday 08 January 2015 01:51 AM, Sa Li wrote:

see this type of error again, back to normal in few secs

[2015-01-07 20:19:49,744] WARN Error in I/O with harmful-jar.master/
10.100.98.102


That's a really weird hostname, the harmful-jar.master. Is that really 
your hostname? You mention that this happens during performance testing. 
Have you taken a note of how many connection are open to that 
10.100.98.102 IP when this Connection refused exception happens?


-Jaikiran



  (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:745)
[2015-01-07 20:19:49,754] WARN Error in I/O with harmful-jar.master/
10.100.98.102 (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:745)
[2015-01-07 20:19:49,764] WARN Error in I/O with harmful-jar.master/
10.100.98.102 (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
 at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:745)
160403 records sent, 32080.6 records/sec (91.78 MB/sec), 507.0 ms avg
latency, 2418.0 max latency.
109882 records sent, 21976.4 records/sec (62.87 MB/sec), 672.7 ms avg
latency, 3529.0 max latency.
100315 records sent, 19995.0 records/sec (57.21 MB/sec), 774.8 ms avg
latency, 3858.0 max latency.

On Wed, Jan 7, 2015 at 12:07 PM, Sa Li sal...@gmail.com wrote:


Hi, All

I am doing performance test by

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test-rep-three 5 100 -1 acks=1 bootstrap.servers=
10.100.98.100:9092,10.100.98.101:9092,10.100.98.102:9092
buffer.memory=67108864 batch.size=8196

where the topic test-rep-three is described as follow:

bin/kafka-topics.sh --describe --zookeeper 10.100.98.101:2181 --topic
test-rep-three
Topic:test-rep-threePartitionCount:8ReplicationFactor:3
Configs:
 Topic: test-rep-three   Partition: 0Leader: 100 Replicas:
100,102,101   Isr: 102,101,100
 Topic: test-rep-three   Partition: 1Leader: 101 Replicas:
101,100,102   Isr: 102,101,100
 Topic: test-rep-three   Partition: 2Leader: 102 Replicas:
102,101,100   Isr: 101,102,100
 Topic: test-rep-three   Partition: 3Leader: 100 Replicas:
100,101,102   Isr: 101,100,102
 Topic: test-rep-three   Partition: 4Leader: 101 Replicas:
101,102,100   Isr: 102,100,101
 Topic: test-rep-three   Partition: 5Leader: 102 Replicas:
102,100,101   Isr: 100,102,101
 Topic: test-rep-three   Partition: 6Leader: 102 Replicas:
100,102,101   Isr: 102,101,100
 Topic: test-rep-three   Partition: 7Leader: 101 Replicas:
101,100,102   Isr: 101,100,102

Apparently, it produces the messages and run for a while, but it
periodically have such exceptions:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that 

Re: question about jmxtrans to get kafka metrics

2015-01-07 Thread Jaikiran Pai

Hi Sa,

Are you really sure w2 is a real hostname, something that is 
resolvable from the system where you are running this. The JSON output 
you posted seems very close to the example from the jmxtrans project 
page https://code.google.com/p/jmxtrans/wiki/GraphiteWriter, so I 
suspect you aren't using the right hostname.


-Jaikiran
On Thursday 08 January 2015 07:29 AM, Sa Li wrote:

Hi, All

I installed jmxtrans and graphite, wish to be able to graph stuff from
kafka, but firstly I start the jmxtrans and getting such errors, (I use the
example graphite json).

./jmxtrans.sh start graphite.json

[07 Jan 2015 17:55:58] [ServerScheduler_Worker-4] 180214 DEBUG
(com.googlecode.jmxtrans.jobs.ServerJob:31) - + Started server job:
Server [host=w2, port=1099,
url=service:jmx:rmi:///jndi/rmi://w2:1099/jmxrmi, cronExpression=null,
numQueryThreads=null]
[07 Jan 2015 17:55:58] [ServerScheduler_Worker-4] 180217 ERROR
(com.googlecode.jmxtrans.jobs.ServerJob:39) - Error
java.io.IOException: Failed to retrieve RMIServer stub:
javax.naming.ConfigurationException [Root exception is
java.rmi.UnknownHostException: Unknown host: w2; nested exception is:
 java.net.UnknownHostException: w2]
 at
javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:369)
 at
javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:268)
 at
com.googlecode.jmxtrans.util.JmxUtils.getServerConnection(JmxUtils.java:351)
 at
com.googlecode.jmxtrans.util.JmxConnectionFactory.makeObject(JmxConnectionFactory.java:31)
 at
org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1212)
 at com.googlecode.jmxtrans.jobs.ServerJob.execute(ServerJob.java:36)
 at org.quartz.core.JobRunShell.run(JobRunShell.java:216)
 at
org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:549)
Caused by: javax.naming.ConfigurationException [Root exception is
java.rmi.UnknownHostException: Unknown host: w2; nested exception is:
 java.net.UnknownHostException: w2]
 at
com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:118)
 at
com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:203)
 at javax.naming.InitialContext.lookup(InitialContext.java:411)
 at
javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1929)
 at
javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1896)
 at
javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:286)
 ... 7 more
Caused by: java.rmi.UnknownHostException: Unknown host: w2; nested
exception is:
 java.net.UnknownHostException: w2
 at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:616)
 at
sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:216)
 at
sun.rmi.transport.tcp.TCPChannel.newConnection(TCPChannel.java:202)
 at sun.rmi.server.UnicastRef.newCall(UnicastRef.java:341)
 at sun.rmi.registry.RegistryImpl_Stub.lookup(Unknown Source)
 at
com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:114)
 ... 12 more
Caused by: java.net.UnknownHostException: w2
 at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 at java.net.Socket.connect(Socket.java:579)
 at java.net.Socket.connect(Socket.java:528)
 at java.net.Socket.init(Socket.java:425)
 at java.net.Socket.init(Socket.java:208)
 at
sun.rmi.transport.proxy.RMIDirectSocketFactory.createSocket(RMIDirectSocketFactory.java:40)
 at
sun.rmi.transport.proxy.RMIMasterSocketFactory.createSocket(RMIMasterSocketFactory.java:147)
 at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:613)
 ... 17 more

The graphite.json

  {
   servers : [ {
 port : 1099,
 host : w2,
 queries : [ {
   obj : java.lang:type=Memory,
   attr : [ HeapMemoryUsage, NonHeapMemoryUsage ],
   outputWriters : [ {
 @class : com.googlecode.jmxtrans.model.output.GraphiteWriter,
 settings : {
   port : 2003,
   host : 10.100.70.128
 }
   } ]
 } ]
   } ]
}


Anyone help me to diagnose what this problem is?

thanks






Re: NotLeaderForPartitionException while doing performance test

2015-01-07 Thread Jaikiran Pai
There are different ways to find the connection count and each one 
depends on the operating system that's being used. lsof -i is one 
option, for example, on *nix systems.


-Jaikiran
On Thursday 08 January 2015 11:40 AM, Sa Li wrote:

Yes, it is weird hostname, ;), that is what our system guys name it. How to
take a note to measure the connections open to 10.100.98.102?

Thanks

AL
On Jan 7, 2015 9:42 PM, Jaikiran Pai jai.forums2...@gmail.com wrote:


On Thursday 08 January 2015 01:51 AM, Sa Li wrote:


see this type of error again, back to normal in few secs

[2015-01-07 20:19:49,744] WARN Error in I/O with harmful-jar.master/
10.100.98.102


That's a really weird hostname, the harmful-jar.master. Is that really
your hostname? You mention that this happens during performance testing.
Have you taken a note of how many connection are open to that 10.100.98.102
IP when this Connection refused exception happens?

-Jaikiran


(org.apache.kafka.common.network.Selector)

java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
  at org.apache.kafka.common.network.Selector.poll(
Selector.java:232)
  at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
  at java.lang.Thread.run(Thread.java:745)
[2015-01-07 20:19:49,754] WARN Error in I/O with harmful-jar.master/
10.100.98.102 (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
  at org.apache.kafka.common.network.Selector.poll(
Selector.java:232)
  at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
  at java.lang.Thread.run(Thread.java:745)
[2015-01-07 20:19:49,764] WARN Error in I/O with harmful-jar.master/
10.100.98.102 (org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
  at org.apache.kafka.common.network.Selector.poll(
Selector.java:232)
  at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
  at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
  at java.lang.Thread.run(Thread.java:745)
160403 records sent, 32080.6 records/sec (91.78 MB/sec), 507.0 ms avg
latency, 2418.0 max latency.
109882 records sent, 21976.4 records/sec (62.87 MB/sec), 672.7 ms avg
latency, 3529.0 max latency.
100315 records sent, 19995.0 records/sec (57.21 MB/sec), 774.8 ms avg
latency, 3858.0 max latency.

On Wed, Jan 7, 2015 at 12:07 PM, Sa Li sal...@gmail.com wrote:

  Hi, All

I am doing performance test by

bin/kafka-run-class.sh org.apache.kafka.clients.
tools.ProducerPerformance
test-rep-three 5 100 -1 acks=1 bootstrap.servers=
10.100.98.100:9092,10.100.98.101:9092,10.100.98.102:9092
buffer.memory=67108864 batch.size=8196

where the topic test-rep-three is described as follow:

bin/kafka-topics.sh --describe --zookeeper 10.100.98.101:2181 --topic
test-rep-three
Topic:test-rep-threePartitionCount:8ReplicationFactor:3
Configs:
  Topic: test-rep-three   Partition: 0Leader: 100
  Replicas:
100,102,101   Isr: 102,101,100
  Topic: test-rep-three   Partition: 1Leader: 101
  Replicas:
101,100,102   Isr: 102,101,100
  Topic: test-rep-three   Partition: 2Leader: 102
  Replicas:
102,101,100   Isr: 101,102,100
  Topic: test-rep-three   Partition: 3Leader: 100
  Replicas:
100,101,102   Isr: 101,100,102
  Topic: test-rep-three   Partition: 4Leader: 101
  Replicas:
101,102,100   Isr: 102,100,101
  Topic: test-rep-three   Partition: 5Leader: 102
  Replicas:
102,100,101   Isr: 100,102,101
  Topic: test-rep-three   Partition: 6Leader: 102
  Replicas:
100,102,101   Isr: 102,101,100
  Topic: test-rep-three   Partition: 7Leader: 101
  Replicas:
101,100,102   Isr: 101,100,102

Apparently, it produces the messages and run for a while, but it
periodically have such exceptions:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server
is not the leader for that topic-partition.
org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server
is not the leader