Re: ZooKeeper connect/disconnect pattern

2014-03-27 Thread Neha Narkhede
[2014-03-25 12:21:58,848] INFO Client session timed out, have not heard
from server in 4001ms f
or sessionid 0x54441e4dae0350a, closing socket connection and attempting
reconnect (org.apache.zookeeper.
ClientCnxn)

This is a problem. It indicates a GC pause or something similar on either
the Kafka broker or your zookeeper server. That is the reason it keeps
establishing new connections to zookeeper. You can enable gc logging or
monitoring to confirm if that is the issue.

Thanks,
Neha


On Thu, Mar 27, 2014 at 3:07 PM, Tom Amon  wrote:

>  From the controller log 
>
>
> [2014-03-25 12:22:39,487] INFO [ControllerEpochListener on 2]: Initialized
> controller epoch to
>
> 11 and zk version 10 (kafka.controller.ControllerEpochListener)
>
> [2014-03-25 12:22:39,519] INFO [Controller 2]: Controller starting up
> (kafka.controller.KafkaCo
>
> ntroller)
>
> [2014-03-25 12:22:39,777] INFO [Controller 2]: Controller startup complete
> (kafka.controller.Ka
>
> fkaController)
>
> [2014-03-25 19:47:59,198] INFO [ControllerEpochListener on 2]: Initialized
> controller epoch to
>
> 11 and zk version 10 (kafka.controller.ControllerEpochListener)
>
> [2014-03-25 19:47:59,230] INFO [Controller 2]: Controller starting up
> (kafka.controller.KafkaCo
>
> ntroller)
>
> [2014-03-25 19:47:59,379] INFO [Controller 2]: Controller startup complete
> (kafka.controller.Ka
>
> fkaController)
>
> [2014-03-25 21:39:50,115] INFO [ControllerEpochListener on 2]: Initialized
> controller epoch to
>
> 12 and zk version 11 (kafka.controller.ControllerEpochListener)
>
> [2014-03-25 21:59:18,020] INFO [Controller 2]: Broker 2 starting become
> controller state transi
>
> tion (kafka.controller.KafkaController)
>
> [2014-03-25 21:59:18,032] INFO [Controller 2]: Controller 2 incremented
> epoch to 13 (kafka.controller.KafkaController)
>
> [2014-03-25 21:59:21,888] INFO [Controller-2-to-broker-2-send-thread],
> Starting  (kafka.controller.RequestSendThread)
>
> [2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-3-send-thread],
> Starting  (kafka.controller.RequestSendThread)
>
> [2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-4-send-thread],
> Starting  (kafka.controller.RequestSendThread)
>
> [2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-5-send-thread],
> Starting  (kafka.controller.RequestSendThread)
>
> [2014-03-25 21:59:21,898] INFO [Controller 2]: Currently active brokers in
> the cluster: Set(2,3, 4, 5) (kafka.controller.KafkaController)
>
> [2014-03-25 21:59:21,899] INFO [Controller 2]: Currently shutting brokers
> in the cluster: Set() (kafka.controller.KafkaController)
>
> [2014-03-25 21:59:21,899] INFO [Controller 2]: Current list of topics in
> the cluster: Set(optimizer-error-topic, optimizer-default-topic,
> metrics-tracker-audit) (kafka.controller.KafkaController)
>
>  From the root log 
>
> [2014-03-25 12:21:58,848] INFO Client session timed out, have not heard
> from server in 4001ms f
> or sessionid 0x54441e4dae0350a, closing socket connection and attempting
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-03-25 12:21:58,948] INFO zookeeper state changed (Disconnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-03-25 12:21:59,136] INFO Opening socket connection to server
>
> slc5b01c-e48f.stratus.slc.com/10.120.104.37:2181(org.apache.zookeeper.ClientCnxn)
> [2014-03-25 12:21:59,136] ERROR Unable to open socket to
> slc5b01c-e48f.stratus.slc.com/10.
> 120.104.37:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-03-25 12:21:59,137] WARN Session 0x54441e4dae0350a for server null,
> unexpected error, closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> java.net.SocketException: Network is unreachable
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> at
>
> org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1068)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1101)
> [2014-03-25 12:21:59,450] INFO Opening socket connection to server
>
> slc5b01c-ef64.stratus.slc.com/10.120.108.36:2181(org.apache.zookeeper.ClientCnxn)
> [2014-03-25 12:21:59,450] ERROR Unable to open socket to
> slc5b01c-ef64.stratus.slc.com/10.
> 120.108.36:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-03-25 12:21:59,450] WARN Session 0x54441e4dae0350a for server null,
> unexpected error, closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn)
> java.net.SocketException: Network is unreachable
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> at
>
> org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1068)
> at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1101)
> [2014-03-25 12:21:59,827] INFO Opening socket connection to server
>
> slc5b01c-4593.stratus.slc.com/10.120.132.36:2181(org.apache.zook

Re: How are rebalance failures raised to consumers?

2014-03-27 Thread Neha Narkhede
That's correct. To recover, you will have to bounce one of the consumer
processes so the group will attempt a rebalance operation.

Thanks,
Neha


On Thu, Mar 27, 2014 at 10:09 AM, Tom Amon  wrote:

> This means that code waiting on the iterator will continue to wait forever
> if rebalance fails? No exception will be thrown from the iterator?
>
> I assume from your message that the only way to tell if a rebalance has
> failed and consumers have stopped consuming is by monitoring the lag and
> restarting the consumer from "outside" the code?
>
>
> -
>
> What Jun described is correct, currently the consumer asynchronously fails
> rebalance. But you can monitor the consumer to detect when a rebalance
> operation fails since that will manifest as lag on the consumer. Please see
> this<
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why
> ?
> >to
>
> learn about consumer lag monitoring.
>
>
>
> Thanks,
>
> Neha
>
>
>
>
>
> On Wed, Mar 26, 2014 at 9:06 PM, Jun Rao  wrote:
>
>
>
> > Currently, there is no callback on rebalance failure. The consumer
>
> > will retry failed rebalances. If all retries fail, we just log the error.
>
> >
>
> > Thanks,
>
> >
>
> > Jun
>
> >
>
> >
>
> > On Wed, Mar 26, 2014 at 5:01 PM, Tom Amon  wrote:
>
> >
>
> > > The pattern for creating and operating consumers that we use is to
>
> > > create the consumer connector, create the streams and then consume
>
> > > each stream
>
> > by
>
> > > waiting on the iterator.
>
> > >
>
> > > If a rebalance occurs and fails, how is the error raised to the
> consumer?
>
> > > Will I get an exception while waiting on the iterator? Is it
>
> > > swallowed
>
> > and
>
> > > the consumer is dead?
>
> > >
>
> > > Thanks.
>
> > >
>
> >
>


Re: Please add Perl client on your wiki

2014-03-27 Thread Neha Narkhede
That's great. Thanks for contributing! Added this to our clients wiki -
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Perl

Thanks,
Neha


On Thu, Mar 27, 2014 at 11:24 AM, Sergiy Zuban  wrote:

> Kafka 0.8.x
>
> Pure Perl, Consumer and Producer implementations included. Zookeeper
> integration. GZIP and Snappy compression not supported.
>
> Maintainer: Sergey Gladkov, 
> License: Artistic License
>
> https://github.com/TrackingSoft/Kafka
> http://search.cpan.org/~sgladkov/Kafka/lib/Kafka.pm
>
> --
> Sergiy Zuban
>


Re: ZK session kill makes high level consumer replay

2014-03-27 Thread Neha Narkhede
When I call consumer.commitOffsets(); before killing session, unit test
succeeded. This problem would happen only with autoCommit enabled

That seems expected. If you call commitOffsets() explicitly before
simulating a GC pause on the consumer, there will be no duplicates since
the next consumer instance that picks up the same partitions will start
reading from the last checkpointed offset. On the other hand, autoCommit
will only commit at a certain interval. So if you decide to pause the
consumer between 2 intervals, then it will replay data since the last
interval.

Thanks,
Neha


On Thu, Mar 27, 2014 at 4:21 PM, Bae, Jae Hyeon  wrote:

> When I call consumer.commitOffsets(); before killing session, unit test
> succeeded. This problem would happen only with autoCommit enabled.
>
> Could you fix this problem before releasing 0.8.1.1?
>
> Thank you
> Best, Jae
>
>
> On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon 
> wrote:
>
> > Hi
> >
> > While testing kafka 0.8 consumer's zk resilience, I found that on the zk
> > session kill and handleNewSession() is called, high level consumer is
> > replaying messages.
> >
> > Is this know issue? I am attaching unit test source code.
> >
> > package com.netflix.nfkafka.zktest;
> >
> > import com.fasterxml.jackson.core.JsonProcessingException;
> > import com.google.common.collect.ImmutableMap;
> > import com.google.common.collect.Lists;
> > import com.netflix.logging.chukwa.JsonMapper;
> > import kafka.consumer.Consumer;
> > import kafka.consumer.ConsumerConfig;
> > import kafka.consumer.KafkaStream;
> > import kafka.consumer.ZookeeperConsumerConnector;
> > import kafka.javaapi.consumer.ConsumerConnector;
> > import kafka.javaapi.producer.Producer;
> > import kafka.producer.KeyedMessage;
> > import kafka.producer.ProducerConfig;
> > import kafka.server.KafkaConfig;
> > import kafka.server.KafkaServer;
> > import kafka.utils.ZkUtils;
> > import org.I0Itec.zkclient.IDefaultNameSpace;
> > import org.I0Itec.zkclient.ZkClient;
> > import org.I0Itec.zkclient.ZkConnection;
> > import org.I0Itec.zkclient.ZkServer;
> > import org.apache.commons.io.FileUtils;
> > import org.apache.commons.lang.StringUtils;
> > import org.apache.curator.test.KillSession;
> > import org.apache.curator.test.TestingServer;
> > import org.apache.zookeeper.ZKUtil;
> > import org.apache.zookeeper.ZooKeeper;
> > import org.codehaus.jackson.type.TypeReference;
> > import org.junit.AfterClass;
> > import org.junit.BeforeClass;
> > import org.junit.Test;
> >
> > import java.io.File;
> > import java.io.IOException;
> > import java.lang.reflect.Field;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.Properties;
> > import java.util.Random;
> >
> > import static org.junit.Assert.assertEquals;
> > import static org.junit.Assert.assertTrue;
> > import static org.junit.Assert.fail;
> >
> > public class TestZkSessionKill {
> > private static TestingServer zkServer;
> > private static KafkaServer server1;
> > private static KafkaServer server2;
> > private static KafkaConfig config1;
> > private static KafkaConfig config2;
> > private static ZooKeeper zk1;
> > private static ZooKeeper zk2;
> >
> > private static ConsumerConnector consumer;
> > private static ZooKeeper zkConsumer;
> > private static KafkaStream stream;
> >
> > private static final intBROKER_ID1 = 0;
> > private static final intBROKER_ID2 = 1;
> > private static final intKAFKA_PORT1 = 2200;
> > private static final intKAFKA_PORT2 = 2201;
> >
> > public static String feed = "testzksessionkill";
> >
> > @BeforeClass
> > public static void setup() throws Exception {
> > zkServer = new TestingServer(-1, tempDir());
> > config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
> > KAFKA_PORT1));
> > server1 = createServer(config1);
> > zk1 = getZk(server1.zkClient());
> >
> > config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
> > KAFKA_PORT2));
> > server2 = createServer(config2);
> > zk2 = getZk(server2.zkClient());
> >
> > Properties props = new Properties();
> > props.setProperty("zookeeper.connect",
> > zkServer.getConnectString());
> > props.setProperty("group.id", feed);
> > props.setProperty("auto.offset.reset", "smallest");
> >
> > generateDataToKafka(0); // initially we have to create the topic
> >
> > createConsumer(props);
> > }
> >
> > public static Properties createBrokerConfig(int nodeId, int port) {
> > Properties props = new Properties();
> > props.put("broker.id",
> > Integer.toString(nodeId));
> > props.put("brokerId",
>  Integer.toString(nodeId));
> > props.put("host.name",   "localhost");
> > props.put("port",Integer.toString(port));
> > props.put("log.dir",
> > tempDir().getAbsolutePath());
> > props.put("log.flush.in

Please add Perl client on your wiki

2014-03-27 Thread Sergiy Zuban
Kafka 0.8.x

Pure Perl, Consumer and Producer implementations included. Zookeeper
integration. GZIP and Snappy compression not supported.

Maintainer: Sergey Gladkov, 
License: Artistic License

https://github.com/TrackingSoft/Kafka
http://search.cpan.org/~sgladkov/Kafka/lib/Kafka.pm

-- 
Sergiy Zuban


Re: data loss on replicated topic

2014-03-27 Thread Jun Rao
We don't expect to lose data in that case. So, this sounds like a bug. Do
you see any other error/warn in broker log around the time the data is lost?

Thanks,

Jun


On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain wrote:

> Hi Neha,
>
> Thanks for the reply. I do not see the ³No broker in ISR² message. If my
> original diagnosis was correct (that there were at least 2 replicas alive
> for the topic at all times) then I believe this is expected, right? I
> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
> there any workaround and/or an ETA for a fix?
>
> Thanks,
> Oliver
>
>
>
>
> On 3/27/14, 5:18 AM, "Neha Narkhede"  wrote:
>
> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
> >see the following log line when you observe data loss -
> >
> >"No broker in ISR is alive for ... There's potential data loss."
> >
> >Thanks,
> >Neha
> >
> >
> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
> >wrote:
> >
> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
> >> like it could be the cause of this. Does that sound right? Is there a
> >>patch
> >> we can test? Any date/time when this is expected to be fixed?
> >>
> >> From: New User mailto:od...@3cinteractive.com
> >>
> >> Date: Wednesday, March 26, 2014 at 11:59 AM
> >> To: "users@kafka.apache.org" <
> >> users@kafka.apache.org>
> >> Subject: data loss on replicated topic
> >>
> >> My company currently testing Kafka for throughput and fault tolerance.
> >> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
> >> with replication factor 3 and 100 partitions. We are publishing with
> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
> >> message is considered sent). If a publication fails, we retry it
> >> indefinitely until it succeeds. We ran a test over a weekend in which we
> >> published messages as fast as we could (from a single publisher). Each
> >> message has a unique ID so we can ensure that all messages are saved by
> >> Kafka at least once at the end of the test. We have a simple script, run
> >> via cron, that kills one broker (chosen at random) once every other hour
> >> (killed via "kill -9"). The broker is then revived 16 minutes after it
> >>was
> >> killed. At the end of the weekend we ran a script to pull all data from
> >>all
> >> partitions and then verify that all messages were persisted by Kafka.
> >>For
> >> the most part, the results are very good. We can sustain about 3k
> >> message/second with almost no data loss.
> >>
> >> Of the roughly 460 million records we produced over 48 hours we lost
> >>only
> >> 7 records. But, I don't think we should have lost any record. All of the
> >> lost records were produced at almost exactly the time one of the brokers
> >> was killed (down to the second which is the granularity of our logs).
> >>Note
> >> that we're producing around 3k messages/second and we killed brokers
> >>many
> >> times over the 48 hour period. Only twice did we see data loss: once we
> >> lost 4 records and once we lost 3. I have checked the Kafka logs and
> >>there
> >> are some expected error messages from the surviving brokers that look
> >>like:
> >>
> >>
> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
> >>fetch
> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
> >>bytes;
> >> RequestInfo: [load_test,20] ->
> >> PartitionFetchInfo(521319,1048576),[load_test,74] ->
> >> PartitionFetchInfo(559017,1048576),[load_test,14] ->
> >> PartitionFetchInfo(420539,1048576),[load_test,0] ->
> >> PartitionFetchInfo(776869,1048576),[load_test,34] ->
> >> PartitionFetchInfo(446435,1048576),[load_test,94] ->
> >> PartitionFetchInfo(849943,1048576),[load_test,40] ->
> >> PartitionFetchInfo(241876,1048576),[load_test,80] ->
> >> PartitionFetchInfo(508778,1048576),[load_test,60] ->
> >> PartitionFetchInfo(81314,1048576),[load_test,54] ->
> >> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
> >>
> >> java.net.ConnectException: Connection refused
> >>
> >> at sun.nio.ch.Net.connect0(Native Method)
> >>
> >> at sun.nio.ch.Net.connect(Net.java:465)
> >>
> >> at sun.nio.ch.Net.connect(Net.java:457)
> >>
> >> at
> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >>
> >> at
> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >>
> >> at
> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >>
> >> at
> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >>
> >> at
> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >>
> >> at
> >>
> >>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(
> >>SimpleConsumer.scala:71)
> >>
> >> at
> >>
> >>kafka.consum

Re: ZK session kill makes high level consumer replay

2014-03-27 Thread Bae, Jae Hyeon
When I call consumer.commitOffsets(); before killing session, unit test
succeeded. This problem would happen only with autoCommit enabled.

Could you fix this problem before releasing 0.8.1.1?

Thank you
Best, Jae


On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon  wrote:

> Hi
>
> While testing kafka 0.8 consumer's zk resilience, I found that on the zk
> session kill and handleNewSession() is called, high level consumer is
> replaying messages.
>
> Is this know issue? I am attaching unit test source code.
>
> package com.netflix.nfkafka.zktest;
>
> import com.fasterxml.jackson.core.JsonProcessingException;
> import com.google.common.collect.ImmutableMap;
> import com.google.common.collect.Lists;
> import com.netflix.logging.chukwa.JsonMapper;
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ZookeeperConsumerConnector;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> import kafka.server.KafkaConfig;
> import kafka.server.KafkaServer;
> import kafka.utils.ZkUtils;
> import org.I0Itec.zkclient.IDefaultNameSpace;
> import org.I0Itec.zkclient.ZkClient;
> import org.I0Itec.zkclient.ZkConnection;
> import org.I0Itec.zkclient.ZkServer;
> import org.apache.commons.io.FileUtils;
> import org.apache.commons.lang.StringUtils;
> import org.apache.curator.test.KillSession;
> import org.apache.curator.test.TestingServer;
> import org.apache.zookeeper.ZKUtil;
> import org.apache.zookeeper.ZooKeeper;
> import org.codehaus.jackson.type.TypeReference;
> import org.junit.AfterClass;
> import org.junit.BeforeClass;
> import org.junit.Test;
>
> import java.io.File;
> import java.io.IOException;
> import java.lang.reflect.Field;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Random;
>
> import static org.junit.Assert.assertEquals;
> import static org.junit.Assert.assertTrue;
> import static org.junit.Assert.fail;
>
> public class TestZkSessionKill {
> private static TestingServer zkServer;
> private static KafkaServer server1;
> private static KafkaServer server2;
> private static KafkaConfig config1;
> private static KafkaConfig config2;
> private static ZooKeeper zk1;
> private static ZooKeeper zk2;
>
> private static ConsumerConnector consumer;
> private static ZooKeeper zkConsumer;
> private static KafkaStream stream;
>
> private static final intBROKER_ID1 = 0;
> private static final intBROKER_ID2 = 1;
> private static final intKAFKA_PORT1 = 2200;
> private static final intKAFKA_PORT2 = 2201;
>
> public static String feed = "testzksessionkill";
>
> @BeforeClass
> public static void setup() throws Exception {
> zkServer = new TestingServer(-1, tempDir());
> config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
> KAFKA_PORT1));
> server1 = createServer(config1);
> zk1 = getZk(server1.zkClient());
>
> config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
> KAFKA_PORT2));
> server2 = createServer(config2);
> zk2 = getZk(server2.zkClient());
>
> Properties props = new Properties();
> props.setProperty("zookeeper.connect",
> zkServer.getConnectString());
> props.setProperty("group.id", feed);
> props.setProperty("auto.offset.reset", "smallest");
>
> generateDataToKafka(0); // initially we have to create the topic
>
> createConsumer(props);
> }
>
> public static Properties createBrokerConfig(int nodeId, int port) {
> Properties props = new Properties();
> props.put("broker.id",
> Integer.toString(nodeId));
> props.put("brokerId",Integer.toString(nodeId));
> props.put("host.name",   "localhost");
> props.put("port",Integer.toString(port));
> props.put("log.dir",
> tempDir().getAbsolutePath());
> props.put("log.flush.interval.messages", "1");
> props.put("zookeeper.connect",
> zkServer.getConnectString());
> props.put("replica.socket.timeout.ms",   "1500");
> props.put("hostName","localhost");
> props.put("numPartitions",   "1");
>
> return props;
> }
>
> public static File tempDir() {
> File f = new File("./build/test", "kafka-" + new
> Random().nextInt(100));
> f.mkdirs();
> System.out.println(f);
> f.deleteOnExit();
> return f;
> }
>
> @AfterClass
> public static void shutdown() throws IOException {
> if (server1 != null) {
> server1.shutdown();
> server1.awaitShutdown();
> }
>
> if (server2 != null) {
> server2.shutdown();
> server2.awaitShutdown();
>  

ZK session kill makes high level consumer replay

2014-03-27 Thread Bae, Jae Hyeon
Hi

While testing kafka 0.8 consumer's zk resilience, I found that on the zk
session kill and handleNewSession() is called, high level consumer is
replaying messages.

Is this know issue? I am attaching unit test source code.

package com.netflix.nfkafka.zktest;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.netflix.logging.chukwa.JsonMapper;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.ZkServer;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.codehaus.jackson.type.TypeReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class TestZkSessionKill {
private static TestingServer zkServer;
private static KafkaServer server1;
private static KafkaServer server2;
private static KafkaConfig config1;
private static KafkaConfig config2;
private static ZooKeeper zk1;
private static ZooKeeper zk2;

private static ConsumerConnector consumer;
private static ZooKeeper zkConsumer;
private static KafkaStream stream;

private static final intBROKER_ID1 = 0;
private static final intBROKER_ID2 = 1;
private static final intKAFKA_PORT1 = 2200;
private static final intKAFKA_PORT2 = 2201;

public static String feed = "testzksessionkill";

@BeforeClass
public static void setup() throws Exception {
zkServer = new TestingServer(-1, tempDir());
config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
KAFKA_PORT1));
server1 = createServer(config1);
zk1 = getZk(server1.zkClient());

config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
KAFKA_PORT2));
server2 = createServer(config2);
zk2 = getZk(server2.zkClient());

Properties props = new Properties();
props.setProperty("zookeeper.connect", zkServer.getConnectString());
props.setProperty("group.id", feed);
props.setProperty("auto.offset.reset", "smallest");

generateDataToKafka(0); // initially we have to create the topic

createConsumer(props);
}

public static Properties createBrokerConfig(int nodeId, int port) {
Properties props = new Properties();
props.put("broker.id",   Integer.toString(nodeId));
props.put("brokerId",Integer.toString(nodeId));
props.put("host.name",   "localhost");
props.put("port",Integer.toString(port));
props.put("log.dir",
tempDir().getAbsolutePath());
props.put("log.flush.interval.messages", "1");
props.put("zookeeper.connect",
zkServer.getConnectString());
props.put("replica.socket.timeout.ms",   "1500");
props.put("hostName","localhost");
props.put("numPartitions",   "1");

return props;
}

public static File tempDir() {
File f = new File("./build/test", "kafka-" + new
Random().nextInt(100));
f.mkdirs();
System.out.println(f);
f.deleteOnExit();
return f;
}

@AfterClass
public static void shutdown() throws IOException {
if (server1 != null) {
server1.shutdown();
server1.awaitShutdown();
}

if (server2 != null) {
server2.shutdown();
server2.awaitShutdown();
}

zkServer.close();
}

public static KafkaServer createServer(KafkaConfig config) throws
NoSuchFieldException, IllegalAccessException {
KafkaServer server = new KafkaServer(config,
kafka.utils.SystemTime$.MODULE$);
server.startup();
return server;
}

public static ZooKeeper getZk(ZkClient zkClient) throws
NoSuchFieldException, IllegalAccessException {
Field f = zkClient.getClass().getDeclaredField("_connection");
f.setAccessible(true

Re: ZooKeeper connect/disconnect pattern

2014-03-27 Thread Tom Amon
 From the controller log 


[2014-03-25 12:22:39,487] INFO [ControllerEpochListener on 2]: Initialized
controller epoch to

11 and zk version 10 (kafka.controller.ControllerEpochListener)

[2014-03-25 12:22:39,519] INFO [Controller 2]: Controller starting up
(kafka.controller.KafkaCo

ntroller)

[2014-03-25 12:22:39,777] INFO [Controller 2]: Controller startup complete
(kafka.controller.Ka

fkaController)

[2014-03-25 19:47:59,198] INFO [ControllerEpochListener on 2]: Initialized
controller epoch to

11 and zk version 10 (kafka.controller.ControllerEpochListener)

[2014-03-25 19:47:59,230] INFO [Controller 2]: Controller starting up
(kafka.controller.KafkaCo

ntroller)

[2014-03-25 19:47:59,379] INFO [Controller 2]: Controller startup complete
(kafka.controller.Ka

fkaController)

[2014-03-25 21:39:50,115] INFO [ControllerEpochListener on 2]: Initialized
controller epoch to

12 and zk version 11 (kafka.controller.ControllerEpochListener)

[2014-03-25 21:59:18,020] INFO [Controller 2]: Broker 2 starting become
controller state transi

tion (kafka.controller.KafkaController)

[2014-03-25 21:59:18,032] INFO [Controller 2]: Controller 2 incremented
epoch to 13 (kafka.controller.KafkaController)

[2014-03-25 21:59:21,888] INFO [Controller-2-to-broker-2-send-thread],
Starting  (kafka.controller.RequestSendThread)

[2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-3-send-thread],
Starting  (kafka.controller.RequestSendThread)

[2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-4-send-thread],
Starting  (kafka.controller.RequestSendThread)

[2014-03-25 21:59:21,895] INFO [Controller-2-to-broker-5-send-thread],
Starting  (kafka.controller.RequestSendThread)

[2014-03-25 21:59:21,898] INFO [Controller 2]: Currently active brokers in
the cluster: Set(2,3, 4, 5) (kafka.controller.KafkaController)

[2014-03-25 21:59:21,899] INFO [Controller 2]: Currently shutting brokers
in the cluster: Set() (kafka.controller.KafkaController)

[2014-03-25 21:59:21,899] INFO [Controller 2]: Current list of topics in
the cluster: Set(optimizer-error-topic, optimizer-default-topic,
metrics-tracker-audit) (kafka.controller.KafkaController)

 From the root log 

[2014-03-25 12:21:58,848] INFO Client session timed out, have not heard
from server in 4001ms f
or sessionid 0x54441e4dae0350a, closing socket connection and attempting
reconnect (org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:58,948] INFO zookeeper state changed (Disconnected)
(org.I0Itec.zkclient.ZkClient)
[2014-03-25 12:21:59,136] INFO Opening socket connection to server
slc5b01c-e48f.stratus.slc.com/10.120.104.37:2181(org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,136] ERROR Unable to open socket to
slc5b01c-e48f.stratus.slc.com/10.
120.104.37:2181 (org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,137] WARN Session 0x54441e4dae0350a for server null,
unexpected error, closing socket connection and attempting reconnect
(org.apache.zookeeper.ClientCnxn)
java.net.SocketException: Network is unreachable
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
at
org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1068)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1101)
[2014-03-25 12:21:59,450] INFO Opening socket connection to server
slc5b01c-ef64.stratus.slc.com/10.120.108.36:2181(org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,450] ERROR Unable to open socket to
slc5b01c-ef64.stratus.slc.com/10.
120.108.36:2181 (org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,450] WARN Session 0x54441e4dae0350a for server null,
unexpected error, closing socket connection and attempting reconnect
(org.apache.zookeeper.ClientCnxn)
java.net.SocketException: Network is unreachable
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
at
org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1068)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1101)
[2014-03-25 12:21:59,827] INFO Opening socket connection to server
slc5b01c-4593.stratus.slc.com/10.120.132.36:2181(org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,828] ERROR Unable to open socket to
slc5b01c-4593.stratus.slc.com/10.
120.132.36:2181 (org.apache.zookeeper.ClientCnxn)
[2014-03-25 12:21:59,828] WARN Session 0x54441e4dae0350a for server null,
unexpected error, closing socket connection and attempting reconnect
(org.apache.zookeeper.ClientCnxn)
java.net.SocketException: Network is unreachable
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
at
org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1068)
at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1101)
[2014-03-25 12:22:38,950] INFO Starting ZkClient event thread.

Error in fetch name

2014-03-27 Thread Tom Amon
We see the following messages in the broker logs whenever we reboot a
broker. These messages filled up 200MB of log files in less than 1 minute.
Are these normal? For reference we have enabled controlled shutdown on each
broker.


[2014-03-25 22:52:45,558] INFO Reconnect due to socket error: null
(kafka.consumer.SimpleConsum
er)
[2014-03-25 22:52:45,558] WARN [ReplicaFetcherThread-0-5], Error in fetch
Name: FetchRequest; V
ersion: 0; CorrelationId: 10981; ClientId: ReplicaFetcherThread-0-5;
ReplicaId: 2; MaxWait: 500
 ms; MinBytes: 1 bytes; RequestInfo: [optimizer-default-topic,283] ->
PartitionFetchInfo(127503
,1048576),[optimizer-default-topic,923] ->
PartitionFetchInfo(132955,1048576),[optimizer-defaul
t-topic,1263] ->
PartitionFetchInfo(136090,1048576),[optimizer-default-topic,1143] ->
Partition
FetchInfo(135009,1048576),[optimizer-default-topic,1343] ->
PartitionFetchInfo(131686,1048576),
[optimizer-default-topic,463] ->
PartitionFetchInfo(131294,1048576),[optimizer-default-topic,63
] -> PartitionFetchInfo(137998,1048576),[optimizer-default-topic,983] ->
PartitionFetchInfo(135
344,1048576),[optimizer-default-topic,3] ->
PartitionFetchInfo(133825,1048576),[optimizer-defau
lt-topic,623] ->
PartitionFetchInfo(135370,1048576),[optimizer-default-topic,1163] ->
Partition
FetchInfo(131605,1048576),[optimizer-default-topic,663] ->
PartitionFetchInfo(129512,1048576),[
optimizer-default-topic,603] ->
PartitionFetchInfo(135542,1048576),[optimizer-default-topic,883
] -> PartitionFetchInfo(133556,1048576),[optimizer-default-topic,303] ->
PartitionFetchInfo(136
799,1048576),[optimizer-default-topic,1383] ->
PartitionFetchInfo(132109,1048576),[optimizer-de
fault-topic,723] ->
PartitionFetchInfo(133904,1048576),[optimizer-default-topic,103] -> Partiti
onFetchInfo(132492,1048576),[optimizer-default-topic,943] ->
PartitionFetchInfo(133505,1048576)
,[optimizer-default-topic,1323] ->
PartitionFetchInfo(132679,1048576),[optimizer-default-topic,
1283] -> PartitionFetchInfo(138022,1048576),[optimizer-default-topic,1123]
-> PartitionFetchInf
o(130080,1048576),[optimizer-default-topic,823] ->
PartitionFetchInfo(135960,1048576),[optimize
r-default-topic,563] ->
PartitionFetchInfo(130459,1048576),[optimizer-default-topic,743] -> Par
titionFetchInfo(133563,1048576),[optimizer-default-topic,843] ->
PartitionFetchInfo(130122,1048
576),[optimizer-default-topic,1043] ->
PartitionFetchInfo(135857,1048576),[optimizer-default-to
pic,243] ->
PartitionFetchInfo(133344,1048576),[optimizer-default-topic,1203] ->
PartitionFetch
Info(136861,1048576),[optimizer-default-topic,143] ->
PartitionFetchInfo(138480,1048576),[optim
izer-default-topic,43] ->
PartitionFetchInfo(135978,1048576),[optimizer-default-topic,643] -> P
artitionFetchInfo(139799,1048576),[optimizer-default-topic,863] ->
PartitionFetchInfo(136473,10
48576),[optimizer-default-topic,323] ->
PartitionFetchInfo(136294,1048576),[optimizer-default-t
opic,363] ->
PartitionFetchInfo(132374,1048576),[optimizer-default-topic,903] ->
PartitionFetch
Info(132833,1048576),[optimizer-default-topic,23] ->
PartitionFetchInfo(138537,1048576),[optimi
zer-default-topic,83] ->
PartitionFetchInfo(134931,1048576),[optimizer-default-topic,1363] -> P
artitionFetchInfo(133590,1048576),[optimizer-default-topic,343] ->
PartitionFetchInfo(131837,10
48576),[optimizer-default-topic,263] ->
PartitionFetchInfo(138943,1048576),[optimizer-default-t
opic,1243] ->
PartitionFetchInfo(138177,1048576),[optimizer-default-topic,1183] ->
PartitionFet
chInfo(129200,1048576),[optimizer-default-topic,383] ->
PartitionFetchInfo(131492,1048576),[opt
imizer-default-topic,443] ->
PartitionFetchInfo(131666,1048576),[optimizer-default-topic,203] -
> PartitionFetchInfo(135036,1048576),[optimizer-default-topic,583] ->
PartitionFetchInfo(130203
,1048576),[optimizer-default-topic,783] ->
PartitionFetchInfo(133470,1048576),[optimizer-defaul
t-topic,1003] ->
PartitionFetchInfo(134826,1048576),[optimizer-default-topic,123] ->
PartitionF
etchInfo(134044,1048576),[optimizer-default-topic,1223] ->
PartitionFetchInfo(133005,1048576),[
optimizer-default-topic,223] ->
PartitionFetchInfo(133248,1048576),[optimizer-default-topic,803
] -> PartitionFetchInfo(135492,1048576),[optimizer-default-topic,963] ->
PartitionFetchInfo(132
238,1048576),[optimizer-default-topic,543] ->
PartitionFetchInfo(132958,1048576),[optimizer-def
ault-topic,763] ->
PartitionFetchInfo(129710,1048576),[optimizer-default-topic,163] -> Partitio
nFetchInfo(130520,1048576),[optimizer-default-topic,503] ->
PartitionFetchInfo(132099,1048576),
[optimizer-default-topic,1103] ->
PartitionFetchInfo(138307,1048576),[optimizer-default-topic,6
83] -> PartitionFetchInfo(135641,1048576),[optimizer-default-topic,1303] ->
PartitionFetchInfo(
133974,1048576),[optimizer-default-topic,483] ->
PartitionFetchInfo(136166,1048576),[optimizer-
default-topic,1023] ->
PartitionFetchInfo(134111,1048576),[optimizer-default-topic,183] -> Part
itionFetchInfo(134412,1048576),[optimizer-default-topic,703]

Re: data loss on replicated topic

2014-03-27 Thread Oliver Dain
Hi Neha,

Thanks for the reply. I do not see the ³No broker in ISR² message. If my
original diagnosis was correct (that there were at least 2 replicas alive
for the topic at all times) then I believe this is expected, right? I
gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
there any workaround and/or an ETA for a fix?

Thanks,
Oliver




On 3/27/14, 5:18 AM, "Neha Narkhede"  wrote:

>It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
>see the following log line when you observe data loss -
>
>"No broker in ISR is alive for ... There's potential data loss."
>
>Thanks,
>Neha
>
>
>On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
>wrote:
>
>> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
>> like it could be the cause of this. Does that sound right? Is there a
>>patch
>> we can test? Any date/time when this is expected to be fixed?
>>
>> From: New User mailto:od...@3cinteractive.com>>
>> Date: Wednesday, March 26, 2014 at 11:59 AM
>> To: "users@kafka.apache.org" <
>> users@kafka.apache.org>
>> Subject: data loss on replicated topic
>>
>> My company currently testing Kafka for throughput and fault tolerance.
>> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
>> with replication factor 3 and 100 partitions. We are publishing with
>> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
>> message is considered sent). If a publication fails, we retry it
>> indefinitely until it succeeds. We ran a test over a weekend in which we
>> published messages as fast as we could (from a single publisher). Each
>> message has a unique ID so we can ensure that all messages are saved by
>> Kafka at least once at the end of the test. We have a simple script, run
>> via cron, that kills one broker (chosen at random) once every other hour
>> (killed via "kill -9"). The broker is then revived 16 minutes after it
>>was
>> killed. At the end of the weekend we ran a script to pull all data from
>>all
>> partitions and then verify that all messages were persisted by Kafka.
>>For
>> the most part, the results are very good. We can sustain about 3k
>> message/second with almost no data loss.
>>
>> Of the roughly 460 million records we produced over 48 hours we lost
>>only
>> 7 records. But, I don't think we should have lost any record. All of the
>> lost records were produced at almost exactly the time one of the brokers
>> was killed (down to the second which is the granularity of our logs).
>>Note
>> that we're producing around 3k messages/second and we killed brokers
>>many
>> times over the 48 hour period. Only twice did we see data loss: once we
>> lost 4 records and once we lost 3. I have checked the Kafka logs and
>>there
>> are some expected error messages from the surviving brokers that look
>>like:
>>
>>
>> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
>>fetch
>> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
>> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
>>bytes;
>> RequestInfo: [load_test,20] ->
>> PartitionFetchInfo(521319,1048576),[load_test,74] ->
>> PartitionFetchInfo(559017,1048576),[load_test,14] ->
>> PartitionFetchInfo(420539,1048576),[load_test,0] ->
>> PartitionFetchInfo(776869,1048576),[load_test,34] ->
>> PartitionFetchInfo(446435,1048576),[load_test,94] ->
>> PartitionFetchInfo(849943,1048576),[load_test,40] ->
>> PartitionFetchInfo(241876,1048576),[load_test,80] ->
>> PartitionFetchInfo(508778,1048576),[load_test,60] ->
>> PartitionFetchInfo(81314,1048576),[load_test,54] ->
>> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
>>
>> java.net.ConnectException: Connection refused
>>
>> at sun.nio.ch.Net.connect0(Native Method)
>>
>> at sun.nio.ch.Net.connect(Net.java:465)
>>
>> at sun.nio.ch.Net.connect(Net.java:457)
>>
>> at 
>>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>>
>> at 
>>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>>
>> at 
>>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>>
>> at 
>>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>>
>> at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>>
>> at
>> 
>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(
>>SimpleConsumer.scala:71)
>>
>> at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply$mcV$sp(SimpleConsumer.scala:109)
>>
>> at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply(SimpleConsumer.scala:109)
>>
>> at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply(SimpleConsumer.scala:109)
>>
>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>
>> at
>> 
>>kaf

Re: How are rebalance failures raised to consumers?

2014-03-27 Thread Tom Amon
This means that code waiting on the iterator will continue to wait forever
if rebalance fails? No exception will be thrown from the iterator?

I assume from your message that the only way to tell if a rebalance has
failed and consumers have stopped consuming is by monitoring the lag and
restarting the consumer from "outside" the code?


-

What Jun described is correct, currently the consumer asynchronously fails
rebalance. But you can monitor the consumer to detect when a rebalance
operation fails since that will manifest as lag on the consumer. Please see
this<
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why?
>to

learn about consumer lag monitoring.



Thanks,

Neha





On Wed, Mar 26, 2014 at 9:06 PM, Jun Rao  wrote:



> Currently, there is no callback on rebalance failure. The consumer

> will retry failed rebalances. If all retries fail, we just log the error.

>

> Thanks,

>

> Jun

>

>

> On Wed, Mar 26, 2014 at 5:01 PM, Tom Amon  wrote:

>

> > The pattern for creating and operating consumers that we use is to

> > create the consumer connector, create the streams and then consume

> > each stream

> by

> > waiting on the iterator.

> >

> > If a rebalance occurs and fails, how is the error raised to the
consumer?

> > Will I get an exception while waiting on the iterator? Is it

> > swallowed

> and

> > the consumer is dead?

> >

> > Thanks.

> >

>


Re: New Consumer API discussion

2014-03-27 Thread Neha Narkhede
If people don't have any more thoughts on this, I will go ahead and submit
a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328.

Thanks,
Neha


On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede wrote:

> I took some time to write some example code using the new consumer APIs to
> cover a range of use cases. This exercise was very useful (thanks for the
> suggestion, Jay!) since I found several improvements to the APIs to make
> them more usable. Here are some of the 
> changesI
>  made -
>
> 1. Added usage examples to the KafkaConsumer 
> javadoc.
> I find it useful for the examples to be in the javadoc vs some wiki. Please
> go through these examples and suggest improvements. The goal would be to
> document a limited set of examples that cover every major use case.
> 2. All APIs that either accept or return offsets are changed to
> Map instead of TopicPartitionOffset... In all the
> examples that I wrote, it was much easier to deal with offsets and pass
> them around in the consumer APIs if they were maps instead of lists
> 3. Due to the above change, I had to introduce 
> commit()and
>  commitAsync() APIs explicitly, in addition to
> commit(Map offsets) and
> commitAsync(Map offsets), since the no-argument case
> would not be covered automatically with Map as the input parameter to the
> commit APIs
> 4. Offset rewind logic is funky with group management. I took a stab and
> it and wrote examples to cover the various offset rewind uses cases I could
> think of. I'm not so sure I like it, so I encourage people to take a look
> at the examples and provide feedback. This feedback is very critical in
> finalizing the consumer APIs as we might have to add/change APIs to make
> offset rewind intuitive and easy to use. (Please see the 3rd and 4th
> examples 
> here
> )
>
> Once I have feedback on the above, I will go ahead and submit a review
> board for the new APIs and javadoc.
>
> Thanks
> Neha
>
>
> On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede wrote:
>
>> Hey Chris,
>>
>> Really sorry for the late reply, wonder how this fell through the cracks.
>> Anyhow, thanks for the great feedback! Here are my comments -
>>
>>
>> 1. Why is the config String->Object instead of String->String?
>>
>> This is probably more of a feedback about the new config management that
>> we adopted in the new clients. I think it is more convenient to write
>> configs.put("a", 42);
>> instead of
>> configs.put("a", Integer.toString(42));
>>
>> 2. Are these Java docs correct?
>>
>>   KafkaConsumer(java.util.Map<
>> java.lang.String,java.lang.Object> configs)
>>   A consumer is instantiated by providing a set of key-value pairs as
>> configuration and a ConsumerRebalanceCallback implementation
>>
>> There is no ConsumerRebalanceCallback parameter.
>>
>> Fixed.
>>
>>
>> 3. Would like to have a method:
>>
>>   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>> TopicPartition... topicAndPartitionsToPoll)
>>
>> I see I can effectively do this by just fiddling with subscribe and
>> unsubscribe before each poll. Is this a low-overhead operation? Can I just
>> unsubscribe from everything after each poll, then re-subscribe to a topic
>> the next iteration. I would probably be doing this in a fairly tight loop.
>>
>> The subscribe and unsubscribe will be very lightweight in-memory
>> operations,
>> so it shouldn't be a problem to just use those APIs directly.
>> Let me know if you think otherwise.
>>
>> 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>> are use cases for decoupling "what to do when no offset exists" from "what
>> to do when I'm out of range". I might want to start from smallest the
>> first time I run, but fail if I ever get offset out of range.
>>
>> How about adding a third option "disable" to "auto.offset.reset"?
>> What this says is that never automatically reset the offset, either if
>> one is not found or if the offset
>> falls out of range. Presumably, you would want to turn this off when you
>> want to control the offsets
>> yourself and use custom rewind/replay logic to reset the consumer's
>> offset. In this case, you would
>> want to turn this feature off so Kafka does not accidentally reset the
>> offset to something else.
>>
>> I'm not so sure when you would want to make the distinction regarding
>> startup and offset falling out
>> of range. Presumably, if you don't trust Kafka to reset the offset, then
>> you can always turn this off
>> and use commit/commitAsync and seek() to set the consumer to the right
>> offset on startup and every
>> time your co

Re: Kafka snappy compression

2014-03-27 Thread 小宇
Hi, here is the issue. https://issues.apache.org/jira/browse/KAFKA-1344


2014-03-27 20:31 GMT+08:00 Neha Narkhede :

> Currently the console producer only supports GZIP. Could you please file a
> JIRA for the snappy support?
>
>
>
>
> On Thu, Mar 27, 2014 at 5:21 AM, 小宇  wrote:
>
> > Yes,maybe I missed something, and how can I send message in snappy
> > compression with Kafka-console-producer?
> > Thanks.
> >
> > Neha Narkhede 于2014年3月27日星期四写道:
> >
> > > Have you followed the wire protocol documented here -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >
> > >
> > >
> > >
> > > On Thu, Mar 27, 2014 at 3:22 AM, 小宇  >
> > > wrote:
> > >
> > > > Hi, all
> > > > I'm working on snappy compression in our Node.js client (
> > > > https://github.com/SOHU-Co/kafka-node),but I found the offset of
> > message
> > > > always 0 after snappy compression, for example:  00
> > 00
> > > > 00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f
> 6e
> > > 65
> > > > 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but
> > it
> > > > seems not support snappy. So I can't figure out what's wrong with my
> > > > client. Did anyone encounter the same problem?
> > > > Thanks.
> > > >
> > >
> >
>


Re: Kafka snappy compression

2014-03-27 Thread 小宇
Thanks, I will try it.


2014-03-27 20:36 GMT+08:00 Magnus Edenhill :

> Some notes on this:
>  - remember that Messages are wrapped in a compressed MessageSet that is
> wrapped in an outer Message which is wrapped in an outer MessageSet (the
> mind boggles).
>  - the Kafka snappy compression has a non-standard framing - snappy-java
> (prefix "\x82SNAPPY\x0") - this framing is specific to the java
> implementation and is not in the official snappy spec or implementation.
>
> You can use the rdkafka_example client from librdkafka to send
> snappy-compressed messages:
> ./rdkafka_example -P -t topic -p 0 -b mybroker -z snappy
>
>
>
> 2014-03-27 19:21 GMT+07:00 小宇 :
>
> > Yes,maybe I missed something, and how can I send message in snappy
> > compression with Kafka-console-producer?
> > Thanks.
> >
> > Neha Narkhede 于2014年3月27日星期四写道:
> >
> > > Have you followed the wire protocol documented here -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >
> > >
> > >
> > >
> > > On Thu, Mar 27, 2014 at 3:22 AM, 小宇  >
> > > wrote:
> > >
> > > > Hi, all
> > > > I'm working on snappy compression in our Node.js client (
> > > > https://github.com/SOHU-Co/kafka-node),but I found the offset of
> > message
> > > > always 0 after snappy compression, for example:  00
> > 00
> > > > 00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f
> 6e
> > > 65
> > > > 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but
> > it
> > > > seems not support snappy. So I can't figure out what's wrong with my
> > > > client. Did anyone encounter the same problem?
> > > > Thanks.
> > > >
> > >
> >
>


Re: Kafka snappy compression

2014-03-27 Thread Magnus Edenhill
Some notes on this:
 - remember that Messages are wrapped in a compressed MessageSet that is
wrapped in an outer Message which is wrapped in an outer MessageSet (the
mind boggles).
 - the Kafka snappy compression has a non-standard framing - snappy-java
(prefix "\x82SNAPPY\x0") - this framing is specific to the java
implementation and is not in the official snappy spec or implementation.

You can use the rdkafka_example client from librdkafka to send
snappy-compressed messages:
./rdkafka_example -P -t topic -p 0 -b mybroker -z snappy



2014-03-27 19:21 GMT+07:00 小宇 :

> Yes,maybe I missed something, and how can I send message in snappy
> compression with Kafka-console-producer?
> Thanks.
>
> Neha Narkhede 于2014年3月27日星期四写道:
>
> > Have you followed the wire protocol documented here -
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >
> >
> >
> >
> > On Thu, Mar 27, 2014 at 3:22 AM, 小宇 >
> > wrote:
> >
> > > Hi, all
> > > I'm working on snappy compression in our Node.js client (
> > > https://github.com/SOHU-Co/kafka-node),but I found the offset of
> message
> > > always 0 after snappy compression, for example:  00
> > > 00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f 6e
> > 65
> > > 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but
> it
> > > seems not support snappy. So I can't figure out what's wrong with my
> > > client. Did anyone encounter the same problem?
> > > Thanks.
> > >
> >
>


Re: Kafka snappy compression

2014-03-27 Thread Neha Narkhede
Currently the console producer only supports GZIP. Could you please file a
JIRA for the snappy support?




On Thu, Mar 27, 2014 at 5:21 AM, 小宇  wrote:

> Yes,maybe I missed something, and how can I send message in snappy
> compression with Kafka-console-producer?
> Thanks.
>
> Neha Narkhede 于2014年3月27日星期四写道:
>
> > Have you followed the wire protocol documented here -
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >
> >
> >
> >
> > On Thu, Mar 27, 2014 at 3:22 AM, 小宇 >
> > wrote:
> >
> > > Hi, all
> > > I'm working on snappy compression in our Node.js client (
> > > https://github.com/SOHU-Co/kafka-node),but I found the offset of
> message
> > > always 0 after snappy compression, for example:  00
> > > 00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f 6e
> > 65
> > > 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but
> it
> > > seems not support snappy. So I can't figure out what's wrong with my
> > > client. Did anyone encounter the same problem?
> > > Thanks.
> > >
> >
>


Re: producers limit

2014-03-27 Thread Neha Narkhede
 What is the performance
overhead if we don't use batching?

It can be very significant. I'm not sure if we have published performance
numbers for the new producer yet, but you can see the 0.8 producer
throughput numbers here -
https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing#Performancetesting-Producerthroughput.
The performance difference between batch size 1 and 50 can be upto 18x.

Thanks,
Neha


On Wed, Mar 26, 2014 at 7:24 PM, Kane Kane  wrote:

> I have a possibly related question:
>
> In a batch sending mode it's possible that messages are lost if
> producer dies while accumulating the batch. What is the performance
> overhead if we don't use batching?
>
> Thanks.
>
>
> On Wed, Mar 26, 2014 at 9:24 AM, Jagbir  wrote:
> > Hi Neha,
> >
> > Thank you for the  numbers as this information helped me to size my
> project as well.  While we are on this topic, do you mind commenting on the
> maximum number of topics that can be hosted on a 8-10 node cluster. We have
> a need for about 100k topics and similar number of producers and consumers
> and I'm looking for some rough estimates.
> >
> > Thanks,
> > Jagbir
> >
> > On March 25, 2014 8:59:21 PM PDT, Neha Narkhede 
> wrote:
> >>You shouldn't have any problem with that. We frequently have 10s of
> >>thousands of producer connections to a 8-10 node cluster at all times.
> >>You
> >>might have to bump up the limit for the number of open file handles per
> >>broker though.
> >>
> >>Thanks,
> >>Neha
> >>
> >>
> >>On Tue, Mar 25, 2014 at 3:41 PM, Kane Kane 
> >>wrote:
> >>
> >>> Is there a recommended cap for the concurrent producers threads?
> >>> We plan to have around 4000 connections across cluster writing to
> >>> kafka, i assume there shouldn't be any performance implications
> >>> related to that?
> >>>
> >>> Thanks.
> >>>
> >
> > --
> > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
>


Re: Kafka snappy compression

2014-03-27 Thread 小宇
Yes,maybe I missed something, and how can I send message in snappy
compression with Kafka-console-producer?
Thanks.

Neha Narkhede 于2014年3月27日星期四写道:

> Have you followed the wire protocol documented here -
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>
>
>
>
> On Thu, Mar 27, 2014 at 3:22 AM, 小宇 >
> wrote:
>
> > Hi, all
> > I'm working on snappy compression in our Node.js client (
> > https://github.com/SOHU-Co/kafka-node),but I found the offset of message
> > always 0 after snappy compression, for example:  > 00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f 6e
> 65
> > 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but it
> > seems not support snappy. So I can't figure out what's wrong with my
> > client. Did anyone encounter the same problem?
> > Thanks.
> >
>


Re: How are rebalance failures raised to consumers?

2014-03-27 Thread Neha Narkhede
What Jun described is correct, currently the consumer asynchronously fails
rebalance. But you can monitor the consumer to detect when a rebalance
operation fails since that will manifest as lag on the consumer. Please see
thisto
learn about consumer lag monitoring.

Thanks,
Neha


On Wed, Mar 26, 2014 at 9:06 PM, Jun Rao  wrote:

> Currently, there is no callback on rebalance failure. The consumer will
> retry failed rebalances. If all retries fail, we just log the error.
>
> Thanks,
>
> Jun
>
>
> On Wed, Mar 26, 2014 at 5:01 PM, Tom Amon  wrote:
>
> > The pattern for creating and operating consumers that we use is to create
> > the consumer connector, create the streams and then consume each stream
> by
> > waiting on the iterator.
> >
> > If a rebalance occurs and fails, how is the error raised to the consumer?
> > Will I get an exception while waiting on the iterator? Is it swallowed
> and
> > the consumer is dead?
> >
> > Thanks.
> >
>


Re: data loss on replicated topic

2014-03-27 Thread Neha Narkhede
It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
see the following log line when you observe data loss -

"No broker in ISR is alive for ... There's potential data loss."

Thanks,
Neha


On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain wrote:

> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
> like it could be the cause of this. Does that sound right? Is there a patch
> we can test? Any date/time when this is expected to be fixed?
>
> From: New User mailto:od...@3cinteractive.com>>
> Date: Wednesday, March 26, 2014 at 11:59 AM
> To: "users@kafka.apache.org" <
> users@kafka.apache.org>
> Subject: data loss on replicated topic
>
> My company currently testing Kafka for throughput and fault tolerance.
> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
> with replication factor 3 and 100 partitions. We are publishing with
> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
> message is considered sent). If a publication fails, we retry it
> indefinitely until it succeeds. We ran a test over a weekend in which we
> published messages as fast as we could (from a single publisher). Each
> message has a unique ID so we can ensure that all messages are saved by
> Kafka at least once at the end of the test. We have a simple script, run
> via cron, that kills one broker (chosen at random) once every other hour
> (killed via "kill -9"). The broker is then revived 16 minutes after it was
> killed. At the end of the weekend we ran a script to pull all data from all
> partitions and then verify that all messages were persisted by Kafka. For
> the most part, the results are very good. We can sustain about 3k
> message/second with almost no data loss.
>
> Of the roughly 460 million records we produced over 48 hours we lost only
> 7 records. But, I don't think we should have lost any record. All of the
> lost records were produced at almost exactly the time one of the brokers
> was killed (down to the second which is the granularity of our logs). Note
> that we're producing around 3k messages/second and we killed brokers many
> times over the 48 hour period. Only twice did we see data loss: once we
> lost 4 records and once we lost 3. I have checked the Kafka logs and there
> are some expected error messages from the surviving brokers that look like:
>
>
> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [load_test,20] ->
> PartitionFetchInfo(521319,1048576),[load_test,74] ->
> PartitionFetchInfo(559017,1048576),[load_test,14] ->
> PartitionFetchInfo(420539,1048576),[load_test,0] ->
> PartitionFetchInfo(776869,1048576),[load_test,34] ->
> PartitionFetchInfo(446435,1048576),[load_test,94] ->
> PartitionFetchInfo(849943,1048576),[load_test,40] ->
> PartitionFetchInfo(241876,1048576),[load_test,80] ->
> PartitionFetchInfo(508778,1048576),[load_test,60] ->
> PartitionFetchInfo(81314,1048576),[load_test,54] ->
> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
>
> java.net.ConnectException: Connection refused
>
> at sun.nio.ch.Net.connect0(Native Method)
>
> at sun.nio.ch.Net.connect(Net.java:465)
>
> at sun.nio.ch.Net.connect(Net.java:457)
>
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>
> at
> kafka.server.AbstractFetcherTh

Re: Kafka snappy compression

2014-03-27 Thread Neha Narkhede
Have you followed the wire protocol documented here -
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol




On Thu, Mar 27, 2014 at 3:22 AM, 小宇  wrote:

> Hi, all
> I'm working on snappy compression in our Node.js client (
> https://github.com/SOHU-Co/kafka-node),but I found the offset of message
> always 0 after snappy compression, for example:  00 00 00 00 00 19 3e 69 5f c8 00 00 00 00 00 00 00 00 00 0b 7b 22 6f 6e 65
> 22 3a 22 31 22 7d>, I try to test it in kafka-cosole-producer.sh, but it
> seems not support snappy. So I can't figure out what's wrong with my
> client. Did anyone encounter the same problem?
> Thanks.
>


Re: Log retention policy - offset ?

2014-03-27 Thread Neha Narkhede
Suppose if old log messages are getting deleted what will happens to offset
?

Offset of a message will never change.

Suppose we have 10 messages in which 3 messages are older than 24 hrs. My
consumer consumed upto message 5 (offset 4). still 5 more msgs yet to
consume

In this case, your consumption is unaffected since only offsets 0 through 2
are garbage collected.

Thanks,
Neha



On Thu, Mar 27, 2014 at 3:44 AM, Ranjith Venkatesan
wrote:

> Hi,
>
> We are about to use kafka-0.8 in our production. We have our config to
> delete messages less than a day i.e 24 hrs. We have our simple consumer
> which will have its offset in memory and will handle if offsetoutofrange
> exception occurs.
>
> Suppose if old log messages are getting deleted what will happens to
> offset ?
>
>
> Let me give an example,
>
> Suppose we have 10 messages in which 3 messages are older than 24 hrs. My
> consumer consumed upto message 5 (offset 4). still 5 more msgs yet to
> consume
>
> After log deletion what will be the offset ?
> 1. If offset gonna be 0 for message 4, then our consumer will consume only
> one message. (4 msgs lost ?)
> 2. If offset gonna be 3 for message 4, then its fine. But what happens if
> offset keeps on increasing and reaches its maximum ? (just curious)
>
>
> Correct me if i am wrong.
>
>
> Thanks in advance
>
>
> Ranjith Venkatesan
>
>


Log retention policy - offset ?

2014-03-27 Thread Ranjith Venkatesan
Hi,We are about to use kafka-0.8 in our production. We have our config to delete messages less than a day i.e 24 hrs. We have our simple consumer which will have its offset in memory and will handle if offsetoutofrange exception occurs.Suppose if old log messages are getting deleted what will happens to offset ? Let me give an example,Suppose we have 10 messages in which 3 messages are older than 24 hrs. My consumer consumed upto message 5 (offset 4). still 5 more msgs yet to consumeAfter log deletion what will be the offset ? 1. If offset gonna be 0 for message 4, then our consumer will consume only one message. (4 msgs lost ?)2. If offset gonna be 3 for message 4, then its fine. But what happens if offset keeps on increasing and reaches its maximum ? (just curious)Correct me if i am wrong.Thanks in advanceRanjith Venkatesan

Kafka snappy compression

2014-03-27 Thread 小宇
Hi, all
I'm working on snappy compression in our Node.js client (
https://github.com/SOHU-Co/kafka-node),but I found the offset of message
always 0 after snappy compression, for example: , I try to test it in kafka-cosole-producer.sh, but it
seems not support snappy. So I can't figure out what's wrong with my
client. Did anyone encounter the same problem?
Thanks.