Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-29 Thread Patricio Echagüe
It's String. I also tried with the generic type String.

The CallbackHandler interface I implement is the one in
kafka.javaapi.producer.async package. Is that the right one?
I'm a bit confused because the exception mentions kafka.producer.async.
CallbackHandler.


On Wed, Jan 29, 2014 at 9:05 PM, Jun Rao  wrote:

> Is your producer instantiated with type byte[]?
>
> Thanks,
>
> Jun
>
>
> On Wed, Jan 29, 2014 at 7:25 PM, Patricio Echagüe  >wrote:
>
> > I'm trying to set a callback handler from java.
> >
> > For that, I created a Callback Handler this way:
> >
> > public class KafkaAsyncCallbackHandler implements
> CallbackHandler {
> > }
> >
> > and set the property
> >
> > callback.handler=com.lucid.kafka.KafkaAsyncCallbackHandler
> >
> >
> > But on runtime I get this exception coming from kafka:
> >
> > Caused by: java.lang.ClassCastException:
> > com.lucid.kafka.KafkaAsyncCallbackHandler cannot be cast to
> > kafka.producer.async.CallbackHandler
> >  at kafka.producer.ProducerPool.(ProducerPool.scala:62)
> > at kafka.javaapi.producer.Producer.(Producer.scala:41)
> >  at
> >
> com.lucid.dao.queue.impl.kafka.KafkaProducer.(KafkaProducer.java:29)
> > at com.lucid.dao.guice.DAOModule.provideProducer(DAOModule.java:448)
> >
> > We are on kafka 0.7.1
> >
> > Am I doing anything wrong here?
> >
> > Thanks
> > Patricio
> >
>


Re: Cannot cast to kafka.producer.async.CallbackHandler

2014-01-29 Thread Jun Rao
Is your producer instantiated with type byte[]?

Thanks,

Jun


On Wed, Jan 29, 2014 at 7:25 PM, Patricio Echagüe wrote:

> I'm trying to set a callback handler from java.
>
> For that, I created a Callback Handler this way:
>
> public class KafkaAsyncCallbackHandler implements CallbackHandler {
> }
>
> and set the property
>
> callback.handler=com.lucid.kafka.KafkaAsyncCallbackHandler
>
>
> But on runtime I get this exception coming from kafka:
>
> Caused by: java.lang.ClassCastException:
> com.lucid.kafka.KafkaAsyncCallbackHandler cannot be cast to
> kafka.producer.async.CallbackHandler
>  at kafka.producer.ProducerPool.(ProducerPool.scala:62)
> at kafka.javaapi.producer.Producer.(Producer.scala:41)
>  at
> com.lucid.dao.queue.impl.kafka.KafkaProducer.(KafkaProducer.java:29)
> at com.lucid.dao.guice.DAOModule.provideProducer(DAOModule.java:448)
>
> We are on kafka 0.7.1
>
> Am I doing anything wrong here?
>
> Thanks
> Patricio
>


Re: Kafka performance test: "--request-num-acks -1" kills throughput

2014-01-29 Thread Jun Rao
Does the result change with just 1 partition?

Thanks,

Jun


On Wed, Jan 29, 2014 at 4:06 PM, Michael Popov  wrote:

> Hi,
>
> We need a reliable low-latency message queue that can scale. Kafka looks
> like a right system for this role.
>
> I am running performance tests on multiple platforms: Linux and Windows.
> For test purposes I create topics with 2 replicas and multiple partitions.
> In all deployments running test producers that wait for both replicas' acks
> practically kills Kafka throughput. For example, on the following
> deployment on Linux machines: 2 Kafka brokers, 1 Zookeeper node, 4 client
> hosts to create load, 4 topics with 10 partitions each and 2 replicas
>
> -  running tests with "--request-num-acks 1" produces ~3,600
> msgs/sec
>
> -  running tests with "--request-num-acks -1" produces ~348
> msgs/sec
>
>
> Here is output of one of the four concurrent processes:
>
> [User@Client2 kafka_2.8.0-0.8.0]$  bin/kafka-producer-perf-test.sh
> --broker-list 10.0.0.8:9092,10.0.0.10:9092 --compression-codec 0
> --message-size 1024 --request-num-acks -1 --sync --messages 10 -threads
> 10 --show-detailed-stats --reporting-interval 1000 --topics c12 | grep -v
> "at "
> start.time, end.time, compression, message.size, batch.size,
> total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
> [2014-01-29 23:21:16,720] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,825] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,830] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,831] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,839] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,841] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,847] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,858] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,862] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,867] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:32:03,830] WARN Produce request with correlation id 11467
> failed due to [c12,2]: kafka.common.RequestTimedOutException
> (kafka.producer.async.DefaultEventHandler)
> [2014-01-29 23:32:03,831] WARN Produce request with correlation id 11859
> failed due to [c12,8]: kafka.common.RequestTimedOutException
> (kafka.producer.async.DefaultEventHandler)
> [2014-01-29 23:32:03,831] WARN Failed to send producer request with
> correlation id 11819 to broker 0 with data for partitions [c12,8]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11315 to broker 0 with data for partitions [c12,6]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11191 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11791 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11395 to broker 0 with data for partitions [c12,6]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11631 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 10563 to broker 0 with data for partitions [c12,0]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 10907 to broker 0 with data for partitions [c12,2]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> 2014-01-29 23:21:16:562, 2014-01-29 23:40:15:886, 0, 1024, 200, 97.66,
> 0.0857, 10, 87.7713
>
> The test result is consistent and reproducible in all deployments: numbers
> can vary but changing acks setting consistently reduces Kafka throughput
> 4-10 times.
>
> Is it expected system behavior? Any tuning options to resolve the problem?
>
> Thank you,
> Michael Popov
>
>


Re: Problems encountered during the consumer shutdown.

2014-01-29 Thread Jun Rao
Normally you will just call shutdown on the connector. Is there a
particular reason that you need to interrupt it?

Thanks,

Jun


On Wed, Jan 29, 2014 at 2:07 PM, paresh shah wrote:

> We are interrupting the thread that uses the consumer connector. The
> question I had was if the LeaderFinderThread is interruptible then it is
> one that is generating the exception due to the await() call( highlighted )
>
>   def shutdown(): Unit = {
> info("Shutting down")
> isRunning.set(false)
> if (isInterruptible)
>   interrupt()
> shutdownLatch.await()
> info("Shutdown completed")
>   }
>
> The above is invoked as a part of the
> ConsumerFetcherManager.stopConnections() in the shutdown path. In this case
> should the shutdown() method not explicitly catch the exception around the
> call to stopConnections().
>
> Please confirm if my understanding is correct.
>
> thanks
> Paresh
>
>


Re: Problems encountered during the consumer shutdown.

2014-01-29 Thread Jun Rao
Which version of Kafka are you using? This doesn't seem to be the 0.8.0
release.

Thanks,

Jun


On Wed, Jan 29, 2014 at 11:38 AM, paresh shah wrote:

> That is exactly what we are doing. But in the shutdown path we see the
> following exception.
>
> 2013-12-19 23:21:54,249 FATAL [kafka.consumer.ZookeeperConsumerConnector]
> (EFKafkaMessageFetcher-retryLevelThree)
> [TestretryLevelThreeKafkaGroup_pshah-MacBook-Pro.local-1387494830478-efdbacea],
> error during consumer connector shutdown
> java.lang.InterruptedException
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1279)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:132)
> at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:165)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:105)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.disconnect(AbstractEFKafkaThreadPooledConsumer.java:106)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer.shutdown(AbstractEFKafkaThreadPooledConsumer.java:399)
> at
> com.ecofactor.kafka.consumer.AbstractRetryTopicConsumer.handleTimeouts(AbstractRetryTopicConsumer.java:127)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.run(AbstractEFKafkaThreadPooledConsumer.java:141)
>
> --
>
> The result of the above exception is that the rest of shutdown logic is
> not getting executed. This involves closing the zookeeper connections.


Cannot cast to kafka.producer.async.CallbackHandler

2014-01-29 Thread Patricio Echagüe
I'm trying to set a callback handler from java.

For that, I created a Callback Handler this way:

public class KafkaAsyncCallbackHandler implements CallbackHandler {
}

and set the property

callback.handler=com.lucid.kafka.KafkaAsyncCallbackHandler


But on runtime I get this exception coming from kafka:

Caused by: java.lang.ClassCastException:
com.lucid.kafka.KafkaAsyncCallbackHandler cannot be cast to
kafka.producer.async.CallbackHandler
 at kafka.producer.ProducerPool.(ProducerPool.scala:62)
at kafka.javaapi.producer.Producer.(Producer.scala:41)
 at
com.lucid.dao.queue.impl.kafka.KafkaProducer.(KafkaProducer.java:29)
at com.lucid.dao.guice.DAOModule.provideProducer(DAOModule.java:448)

We are on kafka 0.7.1

Am I doing anything wrong here?

Thanks
Patricio


Re: High Level Consumer delivery semantics

2014-01-29 Thread Clark Breyman
Guozhang,

Thanks. I'm thinking not of threads crashing but processes/vms/networks
disappearing. Lights-out design so that if any of the computers/routers
catch fire I can still sleep.

I think I can get what I want by spinning up N ZookeeperConsumerConnectors
on the topic each with one thread rather than multiple threads on a single
consumer. That eliminates the need for synchronization and gives me
parallelism. Am I correct?

Thanks again,
C


On Wed, Jan 29, 2014 at 4:00 PM, Guozhang Wang  wrote:

> Hi Clark,
>
> 1. This is true, you need to synchronize these consumer threads when
> calling commitOffsets();
>
> 2. If you are asking what if the consumer thread crashed after
>
> currentTopicInfo.resetConsumeOffset(consumedOffset)
>
> within the next() call, then on its startup, it will lose all these
> in-memory offsets, and read from the ZK which will be smaller than the
> current value, still leading to duplicates but not data losses.
>
> Guozhang
>
>
> On Wed, Jan 29, 2014 at 12:31 PM, Clark Breyman  wrote:
>
> > Guozhang,
> >
> > Thank make sense except for the following:
> >
> > - the ZookeeperConsumerConnector.commitOffsets() method commits the
> current
> > value of PartitionTopicInfo.consumeOffset  for all of the active streams.
> >
> > - the ConsumerIterator in the streams advances the value of
> > PartitionTopicInfo.consumeOffset *before* next() returns, not after the
> > processing on that message is complete.
> >
> > If you have multiple threads consuming, thread A calling commitOffsets()
> > may commit thread B's retrieved but unprocessed message, no?
> >
> >
> > On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang 
> > wrote:
> >
> > > Hi Clark,
> > >
> > > In practice, the client app code need to always commit offset after it
> > has
> > > processed the messages, and hence only the second case may happen,
> > leading
> > > to "at least once".
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman 
> > wrote:
> > >
> > > > Wrestling through the at-least/most-once semantics of my application
> > and
> > > I
> > > > was hoping for some confirmation of the semantics. I'm not sure I can
> > > > classify the high level consumer as either  type.
> > > >
> > > > False ack scenario:
> > > > - Thread A: call next() on the ConsumerIterator, advancing the
> > > > PartitionTopicInfo offset
> > > > - Thread B: commitOffsets() flushed offset of incomplete message to
> ZK
> > > > - Thread A: fail processing (e.g. kill -9)
> > > >
> > > > False retry scenario:
> > > > - Thread A: call next() & successfully process, kill -9 before
> > > > commitOffsets either in thread or in parallel.
> > > >
> > > > Is this right or am I missing something (likely)? Seems like the
> > > semantics
> > > > are essentially approximately once.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka performance test: "--request-num-acks -1" kills throughput

2014-01-29 Thread Neha Narkhede
Michael,

The producer client in 0.8 has a single thread that does blocking data
sends one broker at a time. However, we are working on a rewrite of the
producer with an improved design that can support higher throughput
compared to the current one. We don't have any performance numbers to share
yet, but I think we can send something around as soon as the new producer
client is somewhat stable. The new producer client will be released as part
of 0.9.

Thanks,
Neha


On Wed, Jan 29, 2014 at 4:06 PM, Michael Popov  wrote:

> Hi,
>
> We need a reliable low-latency message queue that can scale. Kafka looks
> like a right system for this role.
>
> I am running performance tests on multiple platforms: Linux and Windows.
> For test purposes I create topics with 2 replicas and multiple partitions.
> In all deployments running test producers that wait for both replicas' acks
> practically kills Kafka throughput. For example, on the following
> deployment on Linux machines: 2 Kafka brokers, 1 Zookeeper node, 4 client
> hosts to create load, 4 topics with 10 partitions each and 2 replicas
>
> -  running tests with "--request-num-acks 1" produces ~3,600
> msgs/sec
>
> -  running tests with "--request-num-acks -1" produces ~348
> msgs/sec
>
>
> Here is output of one of the four concurrent processes:
>
> [User@Client2 kafka_2.8.0-0.8.0]$  bin/kafka-producer-perf-test.sh
> --broker-list 10.0.0.8:9092,10.0.0.10:9092 --compression-codec 0
> --message-size 1024 --request-num-acks -1 --sync --messages 10 -threads
> 10 --show-detailed-stats --reporting-interval 1000 --topics c12 | grep -v
> "at "
> start.time, end.time, compression, message.size, batch.size,
> total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
> [2014-01-29 23:21:16,720] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,825] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,830] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,831] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,839] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,841] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,847] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,858] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,862] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:21:16,867] WARN Property reconnect.interval is not valid
> (kafka.utils.VerifiableProperties)
> [2014-01-29 23:32:03,830] WARN Produce request with correlation id 11467
> failed due to [c12,2]: kafka.common.RequestTimedOutException
> (kafka.producer.async.DefaultEventHandler)
> [2014-01-29 23:32:03,831] WARN Produce request with correlation id 11859
> failed due to [c12,8]: kafka.common.RequestTimedOutException
> (kafka.producer.async.DefaultEventHandler)
> [2014-01-29 23:32:03,831] WARN Failed to send producer request with
> correlation id 11819 to broker 0 with data for partitions [c12,8]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11315 to broker 0 with data for partitions [c12,6]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11191 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11791 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11395 to broker 0 with data for partitions [c12,6]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 11631 to broker 0 with data for partitions [c12,4]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 10563 to broker 0 with data for partitions [c12,0]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> [2014-01-29 23:32:03,834] WARN Failed to send producer request with
> correlation id 10907 to broker 0 with data for partitions [c12,2]
> (kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
> 2014-01-29 23:21:16:562, 2014-01-29 23:40:15:88

Kafka performance test: "--request-num-acks -1" kills throughput

2014-01-29 Thread Michael Popov
Hi,

We need a reliable low-latency message queue that can scale. Kafka looks like a 
right system for this role.

I am running performance tests on multiple platforms: Linux and Windows. For 
test purposes I create topics with 2 replicas and multiple partitions. In all 
deployments running test producers that wait for both replicas' acks 
practically kills Kafka throughput. For example, on the following deployment on 
Linux machines: 2 Kafka brokers, 1 Zookeeper node, 4 client hosts to create 
load, 4 topics with 10 partitions each and 2 replicas

-  running tests with "--request-num-acks 1" produces ~3,600 msgs/sec

-  running tests with "--request-num-acks -1" produces ~348 msgs/sec


Here is output of one of the four concurrent processes:

[User@Client2 kafka_2.8.0-0.8.0]$  bin/kafka-producer-perf-test.sh 
--broker-list 10.0.0.8:9092,10.0.0.10:9092 --compression-codec 0 --message-size 
1024 --request-num-acks -1 --sync --messages 10 -threads 10 
--show-detailed-stats --reporting-interval 1000 --topics c12 | grep -v "at "
start.time, end.time, compression, message.size, batch.size, 
total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
[2014-01-29 23:21:16,720] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,825] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,830] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,831] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,839] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,841] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,847] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,858] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,862] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:21:16,867] WARN Property reconnect.interval is not valid 
(kafka.utils.VerifiableProperties)
[2014-01-29 23:32:03,830] WARN Produce request with correlation id 11467 failed 
due to [c12,2]: kafka.common.RequestTimedOutException 
(kafka.producer.async.DefaultEventHandler)
[2014-01-29 23:32:03,831] WARN Produce request with correlation id 11859 failed 
due to [c12,8]: kafka.common.RequestTimedOutException 
(kafka.producer.async.DefaultEventHandler)
[2014-01-29 23:32:03,831] WARN Failed to send producer request with correlation 
id 11819 to broker 0 with data for partitions [c12,8] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 11315 to broker 0 with data for partitions [c12,6] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 11191 to broker 0 with data for partitions [c12,4] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 11791 to broker 0 with data for partitions [c12,4] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 11395 to broker 0 with data for partitions [c12,6] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 11631 to broker 0 with data for partitions [c12,4] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 10563 to broker 0 with data for partitions [c12,0] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
[2014-01-29 23:32:03,834] WARN Failed to send producer request with correlation 
id 10907 to broker 0 with data for partitions [c12,2] 
(kafka.producer.async.DefaultEventHandler) java.net.SocketTimeoutException
2014-01-29 23:21:16:562, 2014-01-29 23:40:15:886, 0, 1024, 200, 97.66, 0.0857, 
10, 87.7713

The test result is consistent and reproducible in all deployments: numbers can 
vary but changing acks setting consistently reduces Kafka throughput 4-10 times.

Is it expected system behavior? Any tuning options to resolve the problem?

Thank you,
Michael Popov



Re: High Level Consumer delivery semantics

2014-01-29 Thread Guozhang Wang
Hi Clark,

1. This is true, you need to synchronize these consumer threads when
calling commitOffsets();

2. If you are asking what if the consumer thread crashed after

currentTopicInfo.resetConsumeOffset(consumedOffset)

within the next() call, then on its startup, it will lose all these
in-memory offsets, and read from the ZK which will be smaller than the
current value, still leading to duplicates but not data losses.

Guozhang


On Wed, Jan 29, 2014 at 12:31 PM, Clark Breyman  wrote:

> Guozhang,
>
> Thank make sense except for the following:
>
> - the ZookeeperConsumerConnector.commitOffsets() method commits the current
> value of PartitionTopicInfo.consumeOffset  for all of the active streams.
>
> - the ConsumerIterator in the streams advances the value of
> PartitionTopicInfo.consumeOffset *before* next() returns, not after the
> processing on that message is complete.
>
> If you have multiple threads consuming, thread A calling commitOffsets()
> may commit thread B's retrieved but unprocessed message, no?
>
>
> On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang 
> wrote:
>
> > Hi Clark,
> >
> > In practice, the client app code need to always commit offset after it
> has
> > processed the messages, and hence only the second case may happen,
> leading
> > to "at least once".
> >
> > Guozhang
> >
> >
> > On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman 
> wrote:
> >
> > > Wrestling through the at-least/most-once semantics of my application
> and
> > I
> > > was hoping for some confirmation of the semantics. I'm not sure I can
> > > classify the high level consumer as either  type.
> > >
> > > False ack scenario:
> > > - Thread A: call next() on the ConsumerIterator, advancing the
> > > PartitionTopicInfo offset
> > > - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> > > - Thread A: fail processing (e.g. kill -9)
> > >
> > > False retry scenario:
> > > - Thread A: call next() & successfully process, kill -9 before
> > > commitOffsets either in thread or in parallel.
> > >
> > > Is this right or am I missing something (likely)? Seems like the
> > semantics
> > > are essentially approximately once.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: New Producer Public API

2014-01-29 Thread Chris Riccomini
Hey Guys,

My 2c.

1. RecordSend is a confusing name to me. Shouldn't it be
RecordSendResponse?
2. Random nit: it's annoying to have the Javadoc info for the contstants
on 
http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.h
tml, but the string constant values on
http://empathybox.com/kafka-javadoc/constant-values.html#kafka.clients.prod
ucer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG. Find myself toggling between
the two a lot. Not sure if this can be fixed easily.
3. MAX_PARTITION_SIZE_CONFIG - this name is kind of confusing.
Specifically, use of the term "partition". thought it was related to Kafka
topic partitions, not grouping together/batching.
4. METADATA_FETCH_TIMEOUT_CONFIG - what happens if this timeout is
exceeded? Do we get an exception on send()?
5. METADATA_REFRESH_MS_CONFIG - why is this useful? Has to do with acks=0,
right? Worth documenting, I think.
6. PARTITIONER_CLASS_CONFIG - link to partitioner interface in javadocs.
Also, missing a period.
7. KafkaProducer.html - send() documentation says "archive" when you mean
achieve, I think.
8. No javadoc for ProduceRequestResult.
9. In ProduceRequestResult, I understand baseOffset to be the first offset
of the set. Is it possible to get the last offset, as well? If I send
messages A, B, C, D, I'm most interested in D's offset.
10. In ProduceRequestResult, prefer Java-bean style (getError,
isCompleted).
11. At first glance, I like option 1A in your serialization list.
12. We should definitely not introduce a ZK dependency for bootstrapping
broker host/ports.
13. No favor on the Future discussion. I really^Int.Max hate checked
exceptions, but I also like standard interfaces. It's a wash in my book.


Cheers,
Chris

On 1/29/14 10:34 AM, "Neha Narkhede"  wrote:

>>> The challenge of directly exposing ProduceRequestResult is that the
>offset
>provided is just the base offset and there is no way to know for a
>particular message where it was in relation to that base offset because
>the
>batching is transparent and non-deterministic.
>
>That's a good point. I need to look into the code more closely to see if
>it
>is possible to expose
>something like Future send(...) where RequestResult has the
>right metadata
>as well as helper APIs that the user would want. For example
>
>Future messageResponse;
>try {
>  messageResponse = send(...)
>} catch(InterruptedException ie) {
>} catch(ExecutionException ee) {
>}
>
>if(messageResponse.hasError())
>  // handle error
>else {
>   String topic = messageResponse.topic();
>   int partition = messageResponse.partition();
>   long offset = messageResponse.offset();   // can this offset return the
>absolute offset instead of just the relative offset?
>   ...
>}
>
>I could've missed some reasons why we can't do the above. I just think
>that
>separating the future-like functionality of RecordSend
>from the actual response metadata could be useful while supporting Future
>at the same time.
>
>Thanks,
>Neha
>
>
>
>On Wed, Jan 29, 2014 at 10:23 AM, Tom Brown  wrote:
>
>> I strongly support the user of Future. In fact, the cancel method may
>>not
>> be useless. Since the producer is meant to be used by N threads, it
>>could
>> easily get overloaded such that a produce request could not be sent
>> immediately and had to be queued. In that case, cancelling should cause
>>it
>> to not actually get sent.
>>
>> --Tom
>>
>>
>> On Wed, Jan 29, 2014 at 11:06 AM, Jay Kreps  wrote:
>>
>> > Hey Neha,
>> >
>> > Error handling in RecordSend works as in Future you will get the
>> exception
>> > if there is one from any of the accessor methods or await().
>> >
>> > The purpose of hasError was that you can write things slightly more
>> simply
>> > (which some people expressed preference for):
>> >   if(send.hasError())
>> > // do something
>> >   long offset = send.offset();
>> >
>> > Instead of the more the slightly longer:
>> > try {
>> >long offset = send.offset();
>> > } catch (KafkaException e) {
>> >// do something
>> > }
>> >
>> >
>> > On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede
>>> > >wrote:
>> >
>> > > Regarding the use of Futures -
>> > >
>> > > Agree that there are some downsides to using Futures but both
>> approaches
>> > > have some tradeoffs.
>> > >
>> > > - Standardization and usability
>> > > Future is a widely used and understood Java API and given that the
>> > > functionality that RecordSend hopes to provide is essentially that
>>of
>> > > Future, I think it makes sense to expose a widely understood public
>>API
>> > for
>> > > our clients. RecordSend, on the other hand, seems to provide some
>>APIs
>> > that
>> > > are very similar to that of Future, in addition to exposing a bunch
>>of
>> > APIs
>> > > that belong to ProduceRequestResult. As a user, I would've really
>> > preferred
>> > > to deal with ProduceRequestResult directly -
>> > > Future send(...)
>> > >
>> > > - Error handling
>> > > RecordSend's error handling is quite unintuitive where the user has
>>to
>> > > remember 

Problems encountered during the consumer shutdown.

2014-01-29 Thread paresh shah
We are interrupting the thread that uses the consumer connector. The question I 
had was if the LeaderFinderThread is interruptible then it is one that is 
generating the exception due to the await() call( highlighted )

  def shutdown(): Unit = {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
  interrupt()
shutdownLatch.await()
info("Shutdown completed")
  }

The above is invoked as a part of the ConsumerFetcherManager.stopConnections() 
in the shutdown path. In this case should the shutdown() method not explicitly 
catch the exception around the call to stopConnections().

Please confirm if my understanding is correct.

thanks
Paresh



Re: Kafka performance testing

2014-01-29 Thread Joe Stein
Download the source and build from source

wget https://archive.apache.org/dist/kafka/0.8.0/kafka-0.8.0-src.tgz
tar -xvf kafka-0.8.0-src.tgz
cd kafka-0.8.0-src.tgz
./sbt update
./sbt package
./sbt assembly-package-dependency

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/


On Wed, Jan 29, 2014 at 4:34 PM, Michael Popov  wrote:

> Hi,
>
> I try to run Kafka performance tests on my hosts. I get this error message:
>
> [User@Client1 kafka_2.8.0-0.8.0]$ ./bin/kafka-producer-perf-test.sh
> Exception in thread "main" java.lang.NoClassDefFoundError:
> kafka/perf/ProducerPerformance
> Caused by: java.lang.ClassNotFoundException: kafka.perf.ProducerPerformance
> at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
> Could not find the main class: kafka.perf.ProducerPerformance. Program
> will exit.
>
> The command line generated by the script looks like this:
>
> java -Xmx512M -server -XX:+UseCompressedOops -XX:+UseParNewGC
> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
> -Djava.awt.headless=true -Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false
> -Dlog4j.configuration=file:/home/User/work/kafka_2.8.0-0.8.0/bin/../config/tools-log4j.properties
> -cp
> :/home/User/work/kafka_2.8.0-0.8.0/bin/../core/target/scala-2.8.0/*.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../perf/target/scala-2.8.0/kafka*.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/jopt-simple-3.2.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/log4j-1.2.15.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/metrics-annotation-2.2.0.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/metrics-core-2.2.0.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/scala-compiler.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/scala-library.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/slf4j-api-1.7.2.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/slf4j-simple-1.6.4.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/snappy-java-1.0.4.1.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/zkclient-0.3.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/zookeeper-3.3.4.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../kafka_2.8.0-0.8.0.jar
> kafka.perf.ProducerPerformance
>
> I unzipped kafka_2.8.0-0.8.0.jar and verified that there is no perf
> subdirectory.
> Kafka deployment package was downloaded this morning from the official
> Apache mirror.
>
> How this problem can be solved?
>
> Thank you,
> Michael Popov
>
>


Kafka performance testing

2014-01-29 Thread Michael Popov
Hi,

I try to run Kafka performance tests on my hosts. I get this error message:

[User@Client1 kafka_2.8.0-0.8.0]$ ./bin/kafka-producer-perf-test.sh
Exception in thread "main" java.lang.NoClassDefFoundError: 
kafka/perf/ProducerPerformance
Caused by: java.lang.ClassNotFoundException: kafka.perf.ProducerPerformance
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
Could not find the main class: kafka.perf.ProducerPerformance. Program will 
exit.

The command line generated by the script looks like this:

java -Xmx512M -server -XX:+UseCompressedOops -XX:+UseParNewGC 
-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled 
-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false 
-Dcom.sun.management.jmxremote.ssl=false 
-Dlog4j.configuration=file:/home/User/work/kafka_2.8.0-0.8.0/bin/../config/tools-log4j.properties
 -cp 
:/home/User/work/kafka_2.8.0-0.8.0/bin/../core/target/scala-2.8.0/*.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../perf/target/scala-2.8.0/kafka*.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/jopt-simple-3.2.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/log4j-1.2.15.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/metrics-annotation-2.2.0.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/metrics-core-2.2.0.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/scala-compiler.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/scala-library.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/slf4j-api-1.7.2.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/slf4j-simple-1.6.4.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/snappy-java-1.0.4.1.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/zkclient-0.3.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../libs/zookeeper-3.3.4.jar:/home/User/work/kafka_2.8.0-0.8.0/bin/../kafka_2.8.0-0.8.0.jar
 kafka.perf.ProducerPerformance

I unzipped kafka_2.8.0-0.8.0.jar and verified that there is no perf 
subdirectory.
Kafka deployment package was downloaded this morning from the official Apache 
mirror.

How this problem can be solved?

Thank you,
Michael Popov



Re: Problems encountered during the consumer shutdown.

2014-01-29 Thread Neha Narkhede
Can you check why the consumer thread is interrupted during the shutdown at
all?


On Wed, Jan 29, 2014 at 11:38 AM, paresh shah wrote:

> That is exactly what we are doing. But in the shutdown path we see the
> following exception.
>
> 2013-12-19 23:21:54,249 FATAL [kafka.consumer.ZookeeperConsumerConnector]
> (EFKafkaMessageFetcher-retryLevelThree)
> [TestretryLevelThreeKafkaGroup_pshah-MacBook-Pro.local-1387494830478-efdbacea],
> error during consumer connector shutdown
> java.lang.InterruptedException
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1279)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:132)
> at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:165)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:105)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.disconnect(AbstractEFKafkaThreadPooledConsumer.java:106)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer.shutdown(AbstractEFKafkaThreadPooledConsumer.java:399)
> at
> com.ecofactor.kafka.consumer.AbstractRetryTopicConsumer.handleTimeouts(AbstractRetryTopicConsumer.java:127)
> at
> com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.run(AbstractEFKafkaThreadPooledConsumer.java:141)
>
> --
>
> The result of the above exception is that the rest of shutdown logic is
> not getting executed. This involves closing the zookeeper connections.


Re: High Level Consumer delivery semantics

2014-01-29 Thread Clark Breyman
Guozhang,

Thank make sense except for the following:

- the ZookeeperConsumerConnector.commitOffsets() method commits the current
value of PartitionTopicInfo.consumeOffset  for all of the active streams.

- the ConsumerIterator in the streams advances the value of
PartitionTopicInfo.consumeOffset *before* next() returns, not after the
processing on that message is complete.

If you have multiple threads consuming, thread A calling commitOffsets()
may commit thread B's retrieved but unprocessed message, no?


On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang  wrote:

> Hi Clark,
>
> In practice, the client app code need to always commit offset after it has
> processed the messages, and hence only the second case may happen, leading
> to "at least once".
>
> Guozhang
>
>
> On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman  wrote:
>
> > Wrestling through the at-least/most-once semantics of my application and
> I
> > was hoping for some confirmation of the semantics. I'm not sure I can
> > classify the high level consumer as either  type.
> >
> > False ack scenario:
> > - Thread A: call next() on the ConsumerIterator, advancing the
> > PartitionTopicInfo offset
> > - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> > - Thread A: fail processing (e.g. kill -9)
> >
> > False retry scenario:
> > - Thread A: call next() & successfully process, kill -9 before
> > commitOffsets either in thread or in parallel.
> >
> > Is this right or am I missing something (likely)? Seems like the
> semantics
> > are essentially approximately once.
> >
>
>
>
> --
> -- Guozhang
>


Re: high-level consumer design

2014-01-29 Thread Guozhang Wang
To avoid some consumers not consuming anything, one small trick might be
that if a consumer found itself not getting any partition, it can force a
rebalancing by deleting its own registration path and re-register in ZK.


On Mon, Jan 27, 2014 at 4:32 PM, David Birdsong wrote:

> On Mon, Jan 27, 2014 at 4:19 PM, Guozhang Wang  wrote:
>
> > Hello David,
> >
> > One thing about using ZK locks to "own" a partition is load balancing. If
> > you are unlucky some consumer may get all the locks and some may get
> none,
> > hence have no partitions to consume.
> >
>
> I've considered this and even encountered it in testing. For our current
> load levels, we won't hurt us, but if there's a good solution, I'd rather
> codify smooth consumer balance.
>
> Got any suggestions?
>
> My thinking thus far is to establish some sort of identity on the consumer
> and derive an evenness or oddness or some modulo value that induces a small
> delay when encountering particular partition numbers. It's a hacky idea,
> but is pretty simple and might be good enough for smoothing consumers.
>
>
> > Also you may need some synchronization between the consumer thread with
> the
> > offset thread. For example, when an event is fired and the consumers need
> > to re-try grabbing the locks, it needs to first stop current fetchers,
> > commit offsets, and then start owning new partitions.
> >
>
> This is current design and what I have implemented so far. The last thread
> to exit is the offset thread and it has a direct communication channel to
> the consumer threads so it waits for those channels to be closed before
> it's last flush and exit.
>
>
> > Guozhang
> >
> >
> Thanks for the input!
>
>
> >
> > On Mon, Jan 27, 2014 at 3:03 PM, David Birdsong <
> david.birds...@gmail.com
> > >wrote:
> >
> > > Hey All, I've been cobbling together a high-level consumer for golang
> > > building on top of Shopify's Sarama package and wanted to run the basic
> > > design by the list and get some feedback or pointers on things I've
> > missed
> > > or will eventually encounter on my own.
> > >
> > > I'm using zookeeper to coordinate topic-partition owners for consumer
> > > members in each consumer group. I followed the znode layout that's
> > apparent
> > > from watching the console consumer.
> > >
> > > //{offsets,owners,ids}.
> > >
> > > The consumer uses an outer loop to discover the partition list for a
> > given
> > > topic, attempts to grab a zookeeper lock on each (topic,partition)
> tuple,
> > > and then for each (topic, partition) it successfully locks, launches a
> > > thread (goroutine) for each partition to read the partition stream.
> > >
> > > The outer loop continues to watch for children events either of:
> > >
> > >
> >
> //owners//brokers/topics//partitions
> > >
> > > ...any watch event that fires causes all offset data and consumer
> handles
> > > to be flushed and closed, goroutines watching topic-partitions exit.
> The
> > > loop is restarted.
> > >
> > > Another thread reads topic-partition-offset data and flushes the offset
> > >
> >
> to://offsets/
> > >
> > > Have I oversimplified or missed any critical steps?
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: High Level Consumer delivery semantics

2014-01-29 Thread Guozhang Wang
Hi Clark,

In practice, the client app code need to always commit offset after it has
processed the messages, and hence only the second case may happen, leading
to "at least once".

Guozhang


On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman  wrote:

> Wrestling through the at-least/most-once semantics of my application and I
> was hoping for some confirmation of the semantics. I'm not sure I can
> classify the high level consumer as either  type.
>
> False ack scenario:
> - Thread A: call next() on the ConsumerIterator, advancing the
> PartitionTopicInfo offset
> - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> - Thread A: fail processing (e.g. kill -9)
>
> False retry scenario:
> - Thread A: call next() & successfully process, kill -9 before
> commitOffsets either in thread or in parallel.
>
> Is this right or am I missing something (likely)? Seems like the semantics
> are essentially approximately once.
>



-- 
-- Guozhang


Re: Is there a way to delete partition at runtime?

2014-01-29 Thread Guozhang Wang
Hi Marc,

Having a different set of metadata stored on the brokers to feed metadata
requests from producers and consumers would be very tricky I think. For
your use case, one thing you could try is use a customized partitioning
function in the producer to only produce to a subset of the partitions
depending on the traffic, and partitions that do not have any data coming
in will eventually delete all the logs of the partition on that broker,
hence effectively reduce the file handlers for both the logs and the
sockets.

Guozhang


On Tue, Jan 28, 2014 at 12:53 PM, Marc Labbe  wrote:

> Hi Guozhang,
>
> thinking out loud... delete then recreate works if it is acceptable to have
> a topic specific downtime during which Kafka can't accept requests for that
> topic. This downtime would last for the duration while the topic gets
> deleted and then recreated. I am assuming here that a producer sending data
> for a topic, while it is being deleted and before it is recreated, will
> receive an error. The error will be pushed to clients if this process lasts
> longer than the time allowed for retries and number of retries configured
> on the producer. In our case, the producer is a web service.
>
> It may be acceptable if we do this maintenance during low use periods and
> the process is rapid enough (guessing within 30s). Our clients have means
> to resend messages when an error occurs but we may still lose messages if
> it lasts too long. E.g. the client may be shutdown with pending messages. I
> would like to avoid buffering in the web service as much as possible.
>
> What if you don't merge partitions and simply keep shrunk partitions until
> log segments are rolled out and deleted? The only thing you have to worry
> about is to prevent producers from sending data to those partitions by
> having a producer specific metadata which doesn't contain the partitions to
> be deleted? This has the impact of having a different set of metadata for
> topics depending on if you are producer or consumer, which isn't so nice
> though.
>
> I admit this is probably way more simplistic than it really is...
>
> marc
>
>
>
> On Mon, Jan 27, 2014 at 7:24 PM, Guozhang Wang  wrote:
>
> > Siyuan, Marc:
> >
> > We are currently working on topic-deletion supports
> > (KAFKA-330),
> > would first-delete-then-recreate-with-fewer-partitions work for your
> cases?
> > The reason why we are trying to avoid shrinking partition is that it
> would
> > make the logic very complicated. For example, we need to think about
> > within-partition ordering guarantee with partition merging and
> > producing-in-progress simultaneously.
> >
> > Guozhang
> >
> >
> > On Mon, Jan 27, 2014 at 12:35 PM, Marc Labbe  wrote:
> >
> > > I have the same need, and I've just created a Jira:
> > > https://issues.apache.org/jira/browse/KAFKA-1231
> > >
> > > The reasoning behind it is because our topics are created on a per
> > product
> > > basis and each of them usually starts big during the initial weeks and
> > > gradually reduces in time (1-2 years).
> > >
> > > thanks
> > > marc
> > >
> > >
> > > On Thu, Dec 5, 2013 at 7:45 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi Siyuan,
> > > >
> > > > We do not have a tool to shrink the number of partitions (if that is
> > what
> > > > you want) for a topic at runtime yet. Could you file a JIRA for this?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Dec 5, 2013 at 2:16 PM, hsy...@gmail.com 
> > > wrote:
> > > >
> > > > > Hi guys,
> > > > >
> > > > > I found there is a tool to add partition on the fly. My question
> is,
> > is
> > > > > there a way to delete a partition at runtime? Thanks!
> > > > >
> > > > > Best,
> > > > > Siyuan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Java 8 influence on next-generation Java producer and consumer APIs

2014-01-29 Thread Clark Breyman
Jay et al,

What are your current thoughts on ensuring that the next-generation APIs
play nicely with both lambdas and the extensions to the standard runtime in
Java 8?

My thoughts are that if folks are doing the work to reimplement/redesign
the API, it should be as compatible as possible with the latest stuff
provided it doesn't break 1.6 compatibility (as long as that's required).

Areas where this might influence the design:
- Using single-method interfaces that could be supplied by lambdas when
running in J8
- Consideration of Spliterator, Supplier, Stream, ... when designing their
Kakfa counterparts.

Regards,
Clark


High Level Consumer delivery semantics

2014-01-29 Thread Clark Breyman
Wrestling through the at-least/most-once semantics of my application and I
was hoping for some confirmation of the semantics. I'm not sure I can
classify the high level consumer as either  type.

False ack scenario:
- Thread A: call next() on the ConsumerIterator, advancing the
PartitionTopicInfo offset
- Thread B: commitOffsets() flushed offset of incomplete message to ZK
- Thread A: fail processing (e.g. kill -9)

False retry scenario:
- Thread A: call next() & successfully process, kill -9 before
commitOffsets either in thread or in parallel.

Is this right or am I missing something (likely)? Seems like the semantics
are essentially approximately once.


Problems encountered during the consumer shutdown.

2014-01-29 Thread paresh shah
That is exactly what we are doing. But in the shutdown path we see the 
following exception.

2013-12-19 23:21:54,249 FATAL [kafka.consumer.ZookeeperConsumerConnector] 
(EFKafkaMessageFetcher-retryLevelThree) 
[TestretryLevelThreeKafkaGroup_pshah-MacBook-Pro.local-1387494830478-efdbacea], 
error during consumer connector shutdown
java.lang.InterruptedException
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1279)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:132)
at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:165)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:105)
at 
com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.disconnect(AbstractEFKafkaThreadPooledConsumer.java:106)
at 
com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer.shutdown(AbstractEFKafkaThreadPooledConsumer.java:399)
at 
com.ecofactor.kafka.consumer.AbstractRetryTopicConsumer.handleTimeouts(AbstractRetryTopicConsumer.java:127)
at 
com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.run(AbstractEFKafkaThreadPooledConsumer.java:141)

--

The result of the above exception is that the rest of shutdown logic is not 
getting executed. This involves closing the zookeeper connections.

Re: New Producer Public API

2014-01-29 Thread Neha Narkhede
>> The challenge of directly exposing ProduceRequestResult is that the
offset
provided is just the base offset and there is no way to know for a
particular message where it was in relation to that base offset because the
batching is transparent and non-deterministic.

That's a good point. I need to look into the code more closely to see if it
is possible to expose
something like Future send(...) where RequestResult has the
right metadata
as well as helper APIs that the user would want. For example

Future messageResponse;
try {
  messageResponse = send(...)
} catch(InterruptedException ie) {
} catch(ExecutionException ee) {
}

if(messageResponse.hasError())
  // handle error
else {
   String topic = messageResponse.topic();
   int partition = messageResponse.partition();
   long offset = messageResponse.offset();   // can this offset return the
absolute offset instead of just the relative offset?
   ...
}

I could've missed some reasons why we can't do the above. I just think that
separating the future-like functionality of RecordSend
from the actual response metadata could be useful while supporting Future
at the same time.

Thanks,
Neha



On Wed, Jan 29, 2014 at 10:23 AM, Tom Brown  wrote:

> I strongly support the user of Future. In fact, the cancel method may not
> be useless. Since the producer is meant to be used by N threads, it could
> easily get overloaded such that a produce request could not be sent
> immediately and had to be queued. In that case, cancelling should cause it
> to not actually get sent.
>
> --Tom
>
>
> On Wed, Jan 29, 2014 at 11:06 AM, Jay Kreps  wrote:
>
> > Hey Neha,
> >
> > Error handling in RecordSend works as in Future you will get the
> exception
> > if there is one from any of the accessor methods or await().
> >
> > The purpose of hasError was that you can write things slightly more
> simply
> > (which some people expressed preference for):
> >   if(send.hasError())
> > // do something
> >   long offset = send.offset();
> >
> > Instead of the more the slightly longer:
> > try {
> >long offset = send.offset();
> > } catch (KafkaException e) {
> >// do something
> > }
> >
> >
> > On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede  > >wrote:
> >
> > > Regarding the use of Futures -
> > >
> > > Agree that there are some downsides to using Futures but both
> approaches
> > > have some tradeoffs.
> > >
> > > - Standardization and usability
> > > Future is a widely used and understood Java API and given that the
> > > functionality that RecordSend hopes to provide is essentially that of
> > > Future, I think it makes sense to expose a widely understood public API
> > for
> > > our clients. RecordSend, on the other hand, seems to provide some APIs
> > that
> > > are very similar to that of Future, in addition to exposing a bunch of
> > APIs
> > > that belong to ProduceRequestResult. As a user, I would've really
> > preferred
> > > to deal with ProduceRequestResult directly -
> > > Future send(...)
> > >
> > > - Error handling
> > > RecordSend's error handling is quite unintuitive where the user has to
> > > remember to invoke hasError and error, instead of just throwing the
> > > exception. Now there are
> > > some downsides regarding error handling with the Future as well, where
> > the
> > > user has to catch InterruptedException when we would never run into it.
> > > However, it seems like a price worth paying for supporting a standard
> API
> > > and error handling
> > >
> > > - Unused APIs
> > > This is a downside of using Future, where the cancel() operation would
> > > always return false and mean nothing. But we can mention that caveat in
> > our
> > > Java docs.
> > >
> > > To summarize, I would prefer to expose a well understood and widely
> > adopted
> > > Java API and put up with the overhead of catching one unnecessary
> checked
> > > exception, rather than wrap the useful ProduceRequestResult in a custom
> > > async object (RecordSend) and explain that to our many users.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > >
> > >
> > > On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey Neha,
> > > >
> > > > Can you elaborate on why you prefer using Java's Future? The downside
> > in
> > > my
> > > > mind is the use of the checked InterruptedException and
> > > ExecutionException.
> > > > ExecutionException is arguable, but forcing you to catch
> > > > InterruptedException, often in code that can't be interrupted, seems
> > > > perverse. It also leaves us with the cancel() method which I don't
> > think
> > > we
> > > > really can implement.
> > > >
> > > > Option 1A, to recap/elaborate, was the following. There is no
> > Serializer
> > > or
> > > > Partitioner api. We take a byte[] key and value and an optional
> integer
> > > > partition. If you specify the integer partition it will be used. If
> you
> > > do
> > > > not specify a key or a partition the partition will be chosen in a
> > round
> > > > robin fashion. If you specify a key but 

Re: New Producer Public API

2014-01-29 Thread Tom Brown
I strongly support the user of Future. In fact, the cancel method may not
be useless. Since the producer is meant to be used by N threads, it could
easily get overloaded such that a produce request could not be sent
immediately and had to be queued. In that case, cancelling should cause it
to not actually get sent.

--Tom


On Wed, Jan 29, 2014 at 11:06 AM, Jay Kreps  wrote:

> Hey Neha,
>
> Error handling in RecordSend works as in Future you will get the exception
> if there is one from any of the accessor methods or await().
>
> The purpose of hasError was that you can write things slightly more simply
> (which some people expressed preference for):
>   if(send.hasError())
> // do something
>   long offset = send.offset();
>
> Instead of the more the slightly longer:
> try {
>long offset = send.offset();
> } catch (KafkaException e) {
>// do something
> }
>
>
> On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede  >wrote:
>
> > Regarding the use of Futures -
> >
> > Agree that there are some downsides to using Futures but both approaches
> > have some tradeoffs.
> >
> > - Standardization and usability
> > Future is a widely used and understood Java API and given that the
> > functionality that RecordSend hopes to provide is essentially that of
> > Future, I think it makes sense to expose a widely understood public API
> for
> > our clients. RecordSend, on the other hand, seems to provide some APIs
> that
> > are very similar to that of Future, in addition to exposing a bunch of
> APIs
> > that belong to ProduceRequestResult. As a user, I would've really
> preferred
> > to deal with ProduceRequestResult directly -
> > Future send(...)
> >
> > - Error handling
> > RecordSend's error handling is quite unintuitive where the user has to
> > remember to invoke hasError and error, instead of just throwing the
> > exception. Now there are
> > some downsides regarding error handling with the Future as well, where
> the
> > user has to catch InterruptedException when we would never run into it.
> > However, it seems like a price worth paying for supporting a standard API
> > and error handling
> >
> > - Unused APIs
> > This is a downside of using Future, where the cancel() operation would
> > always return false and mean nothing. But we can mention that caveat in
> our
> > Java docs.
> >
> > To summarize, I would prefer to expose a well understood and widely
> adopted
> > Java API and put up with the overhead of catching one unnecessary checked
> > exception, rather than wrap the useful ProduceRequestResult in a custom
> > async object (RecordSend) and explain that to our many users.
> >
> > Thanks,
> > Neha
> >
> >
> >
> >
> > On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps  wrote:
> >
> > > Hey Neha,
> > >
> > > Can you elaborate on why you prefer using Java's Future? The downside
> in
> > my
> > > mind is the use of the checked InterruptedException and
> > ExecutionException.
> > > ExecutionException is arguable, but forcing you to catch
> > > InterruptedException, often in code that can't be interrupted, seems
> > > perverse. It also leaves us with the cancel() method which I don't
> think
> > we
> > > really can implement.
> > >
> > > Option 1A, to recap/elaborate, was the following. There is no
> Serializer
> > or
> > > Partitioner api. We take a byte[] key and value and an optional integer
> > > partition. If you specify the integer partition it will be used. If you
> > do
> > > not specify a key or a partition the partition will be chosen in a
> round
> > > robin fashion. If you specify a key but no partition we will chose a
> > > partition based on a hash of the key. In order to let the user find the
> > > partition we will need to given them access to the Cluster instance
> > > directly from the producer.
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > > >wrote:
> > >
> > > > Here are more thoughts on the public APIs -
> > > >
> > > > - I suggest we use java's Future instead of custom Future especially
> > > since
> > > > it is part of the public API
> > > >
> > > > - Serialization: I like the simplicity of the producer APIs with the
> > > > absence of serialization where we just deal with byte arrays for keys
> > and
> > > > values. What I don't like about this is the performance overhead on
> the
> > > > Partitioner for any kind of custom partitioning based on the
> > > partitionKey.
> > > > Since the only purpose of partitionKey is to do custom partitioning,
> > why
> > > > can't we take it in directly as an integer and let the user figure
> out
> > > the
> > > > mapping from partition_key -> partition_id using the getCluster()
> API?
> > > If I
> > > > understand correctly, this is similar to what you suggested as part
> of
> > > > option 1A. I like this approach since it maintains the simplicity of
> > APIs
> > > > by allowing us to deal with bytes and does not compromise performance
> > in
> > > > the custom partitioning case.
> > 

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Yes, we will absolutely retain protocol compatibility with 0.8 though the
java api will change. The prototype code I posted works with 0.8.

-Jay


On Wed, Jan 29, 2014 at 10:19 AM, Steve Morin  wrote:

> Is the new producer API going to maintain protocol compatibility with old
> version if the API under the hood?
>
> > On Jan 29, 2014, at 10:15, Jay Kreps  wrote:
> >
> > The challenge of directly exposing ProduceRequestResult is that the
> offset
> > provided is just the base offset and there is no way to know for a
> > particular message where it was in relation to that base offset because
> the
> > batching is transparent and non-deterministic. So I think we do need some
> > kind of per-message result.
> >
> > I started with Future, I think for the same reason you
> > prefer it but then when I actually looked at some code samples it wasn't
> > too great--checked exceptions, methods that we can't easily implement,
> etc.
> > I moved away from that for two reasons:
> > 1. When I actually wrote out some code samples of usage they were a
> little
> > ugly for the reasons I described--checked exceptions, methods we can't
> > implement, no helper methods, etc.
> > 2. I originally intended to make the result send work like a
> > ListenableFuture so that you would register the callback on the result
> > rather than as part of the call. I moved away from this primarily because
> > the implementation complexity was a little higher.
> >
> > Whether or not the code prettiness on its own outweighs the familiarity
> of
> > a normal Future I don't know, but that was the evolution of my thinking.
> >
> > -Jay
> >
> >
> >> On Wed, Jan 29, 2014 at 10:06 AM, Jay Kreps 
> wrote:
> >>
> >> Hey Neha,
> >>
> >> Error handling in RecordSend works as in Future you will get the
> exception
> >> if there is one from any of the accessor methods or await().
> >>
> >> The purpose of hasError was that you can write things slightly more
> simply
> >> (which some people expressed preference for):
> >>  if(send.hasError())
> >>// do something
> >>  long offset = send.offset();
> >>
> >> Instead of the more the slightly longer:
> >> try {
> >>   long offset = send.offset();
> >> } catch (KafkaException e) {
> >>   // do something
> >> }
> >>
> >>
> >> On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede <
> neha.narkh...@gmail.com>wrote:
> >>
> >>> Regarding the use of Futures -
> >>>
> >>> Agree that there are some downsides to using Futures but both
> approaches
> >>> have some tradeoffs.
> >>>
> >>> - Standardization and usability
> >>> Future is a widely used and understood Java API and given that the
> >>> functionality that RecordSend hopes to provide is essentially that of
> >>> Future, I think it makes sense to expose a widely understood public API
> >>> for
> >>> our clients. RecordSend, on the other hand, seems to provide some APIs
> >>> that
> >>> are very similar to that of Future, in addition to exposing a bunch of
> >>> APIs
> >>> that belong to ProduceRequestResult. As a user, I would've really
> >>> preferred
> >>> to deal with ProduceRequestResult directly -
> >>> Future send(...)
> >>>
> >>> - Error handling
> >>> RecordSend's error handling is quite unintuitive where the user has to
> >>> remember to invoke hasError and error, instead of just throwing the
> >>> exception. Now there are
> >>> some downsides regarding error handling with the Future as well, where
> the
> >>> user has to catch InterruptedException when we would never run into it.
> >>> However, it seems like a price worth paying for supporting a standard
> API
> >>> and error handling
> >>>
> >>> - Unused APIs
> >>> This is a downside of using Future, where the cancel() operation would
> >>> always return false and mean nothing. But we can mention that caveat in
> >>> our
> >>> Java docs.
> >>>
> >>> To summarize, I would prefer to expose a well understood and widely
> >>> adopted
> >>> Java API and put up with the overhead of catching one unnecessary
> checked
> >>> exception, rather than wrap the useful ProduceRequestResult in a custom
> >>> async object (RecordSend) and explain that to our many users.
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>>
> >>>
>  On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps 
> wrote:
> 
>  Hey Neha,
> 
>  Can you elaborate on why you prefer using Java's Future? The downside
> >>> in my
>  mind is the use of the checked InterruptedException and
> >>> ExecutionException.
>  ExecutionException is arguable, but forcing you to catch
>  InterruptedException, often in code that can't be interrupted, seems
>  perverse. It also leaves us with the cancel() method which I don't
> >>> think we
>  really can implement.
> 
>  Option 1A, to recap/elaborate, was the following. There is no
> >>> Serializer or
>  Partitioner api. We take a byte[] key and value and an optional
> integer
>  partition. If you specify the integer partition it will be used. If
> you
> >>> do
>  not specify 

Re: New Producer Public API

2014-01-29 Thread Tom Brown
Jay,

I think you're confused between my use of "basic client" and "connection".
There is one basic client for a cluster. An IO thread manages the tcp
connections for any number of brokers. The basic client has a queue of
requests each broker. When a tcp connection (associated with broker X) is
ready to send the next request, it asks the basic client for the next
request for broker X.

The producer is just a layer that maps partitions to brokers so you only
have to tell it to send to partiton #3, and it knows that partition #3 goes
to broker X, and adds a produce request to the queue for broker X.

Conceivably (though I haven't implemented it yet), a multi-produce request
could be used in the same way. Since request pipelining is in place, I
don't see a good reason to use multi-produce.

Did I clear it up any, or is this just more confusing?

--Tom





On Wed, Jan 29, 2014 at 11:00 AM, Jay Kreps  wrote:

> Hey Tom,
>
> So is there one connection and I/O thread per broker and a low-level client
> for each of those, and then you hash into that to partition? Is it possible
> to batch across partitions or only within a partition?
>
> -Jay
>
>
> On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown  wrote:
>
> > Jay,
> >
> > There is both a basic client object, and a number of IO processing
> threads.
> > The client object manages connections, creating new ones when new
> machines
> > are connected, or when existing connections die. It also manages a queue
> of
> > requests for each server. The IO processing thread has a selector, and
> > performs the work of sending/receiving (removing items from the queue,
> > interpreting the response at a basic level, etc). Since asynchronous
> > sockets by nature decouple sending and receiving, request pipelining is
> > inherent.
> >
> > Using the basic client, you can send individual produce requests
> (singular
> > or batched). The "producer" layer adds an additional queue for each
> > partition, allowing individual messages to be batched together.
> >
> > --Tom
> >
> >
> >
> > On Tue, Jan 28, 2014 at 9:10 PM, Jay Kreps  wrote:
> >
> > > Hey Neha,
> > >
> > > Can you elaborate on why you prefer using Java's Future? The downside
> in
> > my
> > > mind is the use of the checked InterruptedException and
> > ExecutionException.
> > > ExecutionException is arguable, but forcing you to catch
> > > InterruptedException, often in code that can't be interrupted, seems
> > > perverse. It also leaves us with the cancel() method which I don't
> think
> > we
> > > really can implement.
> > >
> > > Option 1A, to recap/elaborate, was the following. There is no
> Serializer
> > or
> > > Partitioner api. We take a byte[] key and value and an optional integer
> > > partition. If you specify the integer partition it will be used. If you
> > do
> > > not specify a key or a partition the partition will be chosen in a
> round
> > > robin fashion. If you specify a key but no partition we will chose a
> > > partition based on a hash of the key. In order to let the user find the
> > > partition we will need to given them access to the Cluster instance
> > > directly from the producer.
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > > >wrote:
> > >
> > > > Here are more thoughts on the public APIs -
> > > >
> > > > - I suggest we use java's Future instead of custom Future especially
> > > since
> > > > it is part of the public API
> > > >
> > > > - Serialization: I like the simplicity of the producer APIs with the
> > > > absence of serialization where we just deal with byte arrays for keys
> > and
> > > > values. What I don't like about this is the performance overhead on
> the
> > > > Partitioner for any kind of custom partitioning based on the
> > > partitionKey.
> > > > Since the only purpose of partitionKey is to do custom partitioning,
> > why
> > > > can't we take it in directly as an integer and let the user figure
> out
> > > the
> > > > mapping from partition_key -> partition_id using the getCluster()
> API?
> > > If I
> > > > understand correctly, this is similar to what you suggested as part
> of
> > > > option 1A. I like this approach since it maintains the simplicity of
> > APIs
> > > > by allowing us to deal with bytes and does not compromise performance
> > in
> > > > the custom partitioning case.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > >
> > > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps 
> > wrote:
> > > >
> > > > > Hey Tom,
> > > > >
> > > > > That sounds cool. How did you end up handling parallel I/O if you
> > wrap
> > > > the
> > > > > individual connections? Don't you need some selector that selects
> > over
> > > > all
> > > > > the connections?
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown 
> > > wrote:
> > > > >
> > > > > > I implemented a 0.7 client in pure java, and its API very closely
> > > > > resembled
> > > > > > this. (When multiple peopl

Re: New Producer Public API

2014-01-29 Thread Steve Morin
Is the new producer API going to maintain protocol compatibility with old 
version if the API under the hood?

> On Jan 29, 2014, at 10:15, Jay Kreps  wrote:
> 
> The challenge of directly exposing ProduceRequestResult is that the offset
> provided is just the base offset and there is no way to know for a
> particular message where it was in relation to that base offset because the
> batching is transparent and non-deterministic. So I think we do need some
> kind of per-message result.
> 
> I started with Future, I think for the same reason you
> prefer it but then when I actually looked at some code samples it wasn't
> too great--checked exceptions, methods that we can't easily implement, etc.
> I moved away from that for two reasons:
> 1. When I actually wrote out some code samples of usage they were a little
> ugly for the reasons I described--checked exceptions, methods we can't
> implement, no helper methods, etc.
> 2. I originally intended to make the result send work like a
> ListenableFuture so that you would register the callback on the result
> rather than as part of the call. I moved away from this primarily because
> the implementation complexity was a little higher.
> 
> Whether or not the code prettiness on its own outweighs the familiarity of
> a normal Future I don't know, but that was the evolution of my thinking.
> 
> -Jay
> 
> 
>> On Wed, Jan 29, 2014 at 10:06 AM, Jay Kreps  wrote:
>> 
>> Hey Neha,
>> 
>> Error handling in RecordSend works as in Future you will get the exception
>> if there is one from any of the accessor methods or await().
>> 
>> The purpose of hasError was that you can write things slightly more simply
>> (which some people expressed preference for):
>>  if(send.hasError())
>>// do something
>>  long offset = send.offset();
>> 
>> Instead of the more the slightly longer:
>> try {
>>   long offset = send.offset();
>> } catch (KafkaException e) {
>>   // do something
>> }
>> 
>> 
>> On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede 
>> wrote:
>> 
>>> Regarding the use of Futures -
>>> 
>>> Agree that there are some downsides to using Futures but both approaches
>>> have some tradeoffs.
>>> 
>>> - Standardization and usability
>>> Future is a widely used and understood Java API and given that the
>>> functionality that RecordSend hopes to provide is essentially that of
>>> Future, I think it makes sense to expose a widely understood public API
>>> for
>>> our clients. RecordSend, on the other hand, seems to provide some APIs
>>> that
>>> are very similar to that of Future, in addition to exposing a bunch of
>>> APIs
>>> that belong to ProduceRequestResult. As a user, I would've really
>>> preferred
>>> to deal with ProduceRequestResult directly -
>>> Future send(...)
>>> 
>>> - Error handling
>>> RecordSend's error handling is quite unintuitive where the user has to
>>> remember to invoke hasError and error, instead of just throwing the
>>> exception. Now there are
>>> some downsides regarding error handling with the Future as well, where the
>>> user has to catch InterruptedException when we would never run into it.
>>> However, it seems like a price worth paying for supporting a standard API
>>> and error handling
>>> 
>>> - Unused APIs
>>> This is a downside of using Future, where the cancel() operation would
>>> always return false and mean nothing. But we can mention that caveat in
>>> our
>>> Java docs.
>>> 
>>> To summarize, I would prefer to expose a well understood and widely
>>> adopted
>>> Java API and put up with the overhead of catching one unnecessary checked
>>> exception, rather than wrap the useful ProduceRequestResult in a custom
>>> async object (RecordSend) and explain that to our many users.
>>> 
>>> Thanks,
>>> Neha
>>> 
>>> 
>>> 
>>> 
 On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps  wrote:
 
 Hey Neha,
 
 Can you elaborate on why you prefer using Java's Future? The downside
>>> in my
 mind is the use of the checked InterruptedException and
>>> ExecutionException.
 ExecutionException is arguable, but forcing you to catch
 InterruptedException, often in code that can't be interrupted, seems
 perverse. It also leaves us with the cancel() method which I don't
>>> think we
 really can implement.
 
 Option 1A, to recap/elaborate, was the following. There is no
>>> Serializer or
 Partitioner api. We take a byte[] key and value and an optional integer
 partition. If you specify the integer partition it will be used. If you
>>> do
 not specify a key or a partition the partition will be chosen in a round
 robin fashion. If you specify a key but no partition we will chose a
 partition based on a hash of the key. In order to let the user find the
 partition we will need to given them access to the Cluster instance
 directly from the producer.
 
 -Jay
 
 
 On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede  wrote:
 
> Here are more thoughts on the public APIs -

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
The challenge of directly exposing ProduceRequestResult is that the offset
provided is just the base offset and there is no way to know for a
particular message where it was in relation to that base offset because the
batching is transparent and non-deterministic. So I think we do need some
kind of per-message result.

I started with Future, I think for the same reason you
prefer it but then when I actually looked at some code samples it wasn't
too great--checked exceptions, methods that we can't easily implement, etc.
I moved away from that for two reasons:
1. When I actually wrote out some code samples of usage they were a little
ugly for the reasons I described--checked exceptions, methods we can't
implement, no helper methods, etc.
2. I originally intended to make the result send work like a
ListenableFuture so that you would register the callback on the result
rather than as part of the call. I moved away from this primarily because
the implementation complexity was a little higher.

Whether or not the code prettiness on its own outweighs the familiarity of
a normal Future I don't know, but that was the evolution of my thinking.

-Jay


On Wed, Jan 29, 2014 at 10:06 AM, Jay Kreps  wrote:

> Hey Neha,
>
> Error handling in RecordSend works as in Future you will get the exception
> if there is one from any of the accessor methods or await().
>
> The purpose of hasError was that you can write things slightly more simply
> (which some people expressed preference for):
>   if(send.hasError())
> // do something
>   long offset = send.offset();
>
> Instead of the more the slightly longer:
> try {
>long offset = send.offset();
> } catch (KafkaException e) {
>// do something
> }
>
>
> On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede 
> wrote:
>
>> Regarding the use of Futures -
>>
>> Agree that there are some downsides to using Futures but both approaches
>> have some tradeoffs.
>>
>> - Standardization and usability
>> Future is a widely used and understood Java API and given that the
>> functionality that RecordSend hopes to provide is essentially that of
>> Future, I think it makes sense to expose a widely understood public API
>> for
>> our clients. RecordSend, on the other hand, seems to provide some APIs
>> that
>> are very similar to that of Future, in addition to exposing a bunch of
>> APIs
>> that belong to ProduceRequestResult. As a user, I would've really
>> preferred
>> to deal with ProduceRequestResult directly -
>> Future send(...)
>>
>> - Error handling
>> RecordSend's error handling is quite unintuitive where the user has to
>> remember to invoke hasError and error, instead of just throwing the
>> exception. Now there are
>> some downsides regarding error handling with the Future as well, where the
>> user has to catch InterruptedException when we would never run into it.
>> However, it seems like a price worth paying for supporting a standard API
>> and error handling
>>
>> - Unused APIs
>> This is a downside of using Future, where the cancel() operation would
>> always return false and mean nothing. But we can mention that caveat in
>> our
>> Java docs.
>>
>> To summarize, I would prefer to expose a well understood and widely
>> adopted
>> Java API and put up with the overhead of catching one unnecessary checked
>> exception, rather than wrap the useful ProduceRequestResult in a custom
>> async object (RecordSend) and explain that to our many users.
>>
>> Thanks,
>> Neha
>>
>>
>>
>>
>> On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps  wrote:
>>
>> > Hey Neha,
>> >
>> > Can you elaborate on why you prefer using Java's Future? The downside
>> in my
>> > mind is the use of the checked InterruptedException and
>> ExecutionException.
>> > ExecutionException is arguable, but forcing you to catch
>> > InterruptedException, often in code that can't be interrupted, seems
>> > perverse. It also leaves us with the cancel() method which I don't
>> think we
>> > really can implement.
>> >
>> > Option 1A, to recap/elaborate, was the following. There is no
>> Serializer or
>> > Partitioner api. We take a byte[] key and value and an optional integer
>> > partition. If you specify the integer partition it will be used. If you
>> do
>> > not specify a key or a partition the partition will be chosen in a round
>> > robin fashion. If you specify a key but no partition we will chose a
>> > partition based on a hash of the key. In order to let the user find the
>> > partition we will need to given them access to the Cluster instance
>> > directly from the producer.
>> >
>> > -Jay
>> >
>> >
>> > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede > > >wrote:
>> >
>> > > Here are more thoughts on the public APIs -
>> > >
>> > > - I suggest we use java's Future instead of custom Future especially
>> > since
>> > > it is part of the public API
>> > >
>> > > - Serialization: I like the simplicity of the producer APIs with the
>> > > absence of serialization where we just deal with byte arrays for keys
>> and
>> > > valu

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Hey Neha,

Error handling in RecordSend works as in Future you will get the exception
if there is one from any of the accessor methods or await().

The purpose of hasError was that you can write things slightly more simply
(which some people expressed preference for):
  if(send.hasError())
// do something
  long offset = send.offset();

Instead of the more the slightly longer:
try {
   long offset = send.offset();
} catch (KafkaException e) {
   // do something
}


On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede wrote:

> Regarding the use of Futures -
>
> Agree that there are some downsides to using Futures but both approaches
> have some tradeoffs.
>
> - Standardization and usability
> Future is a widely used and understood Java API and given that the
> functionality that RecordSend hopes to provide is essentially that of
> Future, I think it makes sense to expose a widely understood public API for
> our clients. RecordSend, on the other hand, seems to provide some APIs that
> are very similar to that of Future, in addition to exposing a bunch of APIs
> that belong to ProduceRequestResult. As a user, I would've really preferred
> to deal with ProduceRequestResult directly -
> Future send(...)
>
> - Error handling
> RecordSend's error handling is quite unintuitive where the user has to
> remember to invoke hasError and error, instead of just throwing the
> exception. Now there are
> some downsides regarding error handling with the Future as well, where the
> user has to catch InterruptedException when we would never run into it.
> However, it seems like a price worth paying for supporting a standard API
> and error handling
>
> - Unused APIs
> This is a downside of using Future, where the cancel() operation would
> always return false and mean nothing. But we can mention that caveat in our
> Java docs.
>
> To summarize, I would prefer to expose a well understood and widely adopted
> Java API and put up with the overhead of catching one unnecessary checked
> exception, rather than wrap the useful ProduceRequestResult in a custom
> async object (RecordSend) and explain that to our many users.
>
> Thanks,
> Neha
>
>
>
>
> On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps  wrote:
>
> > Hey Neha,
> >
> > Can you elaborate on why you prefer using Java's Future? The downside in
> my
> > mind is the use of the checked InterruptedException and
> ExecutionException.
> > ExecutionException is arguable, but forcing you to catch
> > InterruptedException, often in code that can't be interrupted, seems
> > perverse. It also leaves us with the cancel() method which I don't think
> we
> > really can implement.
> >
> > Option 1A, to recap/elaborate, was the following. There is no Serializer
> or
> > Partitioner api. We take a byte[] key and value and an optional integer
> > partition. If you specify the integer partition it will be used. If you
> do
> > not specify a key or a partition the partition will be chosen in a round
> > robin fashion. If you specify a key but no partition we will chose a
> > partition based on a hash of the key. In order to let the user find the
> > partition we will need to given them access to the Cluster instance
> > directly from the producer.
> >
> > -Jay
> >
> >
> > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede  > >wrote:
> >
> > > Here are more thoughts on the public APIs -
> > >
> > > - I suggest we use java's Future instead of custom Future especially
> > since
> > > it is part of the public API
> > >
> > > - Serialization: I like the simplicity of the producer APIs with the
> > > absence of serialization where we just deal with byte arrays for keys
> and
> > > values. What I don't like about this is the performance overhead on the
> > > Partitioner for any kind of custom partitioning based on the
> > partitionKey.
> > > Since the only purpose of partitionKey is to do custom partitioning,
> why
> > > can't we take it in directly as an integer and let the user figure out
> > the
> > > mapping from partition_key -> partition_id using the getCluster() API?
> > If I
> > > understand correctly, this is similar to what you suggested as part of
> > > option 1A. I like this approach since it maintains the simplicity of
> APIs
> > > by allowing us to deal with bytes and does not compromise performance
> in
> > > the custom partitioning case.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > >
> > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > That sounds cool. How did you end up handling parallel I/O if you
> wrap
> > > the
> > > > individual connections? Don't you need some selector that selects
> over
> > > all
> > > > the connections?
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown 
> > wrote:
> > > >
> > > > > I implemented a 0.7 client in pure java, and its API very closely
> > > > resembled
> > > > > this. (When multiple people independently engineer the same
> solution,
> > > > it's
> > > > > probably good... 

Re: New Producer Public API

2014-01-29 Thread Neha Narkhede
Regarding the use of Futures -

Agree that there are some downsides to using Futures but both approaches
have some tradeoffs.

- Standardization and usability
Future is a widely used and understood Java API and given that the
functionality that RecordSend hopes to provide is essentially that of
Future, I think it makes sense to expose a widely understood public API for
our clients. RecordSend, on the other hand, seems to provide some APIs that
are very similar to that of Future, in addition to exposing a bunch of APIs
that belong to ProduceRequestResult. As a user, I would've really preferred
to deal with ProduceRequestResult directly -
Future send(...)

- Error handling
RecordSend's error handling is quite unintuitive where the user has to
remember to invoke hasError and error, instead of just throwing the
exception. Now there are
some downsides regarding error handling with the Future as well, where the
user has to catch InterruptedException when we would never run into it.
However, it seems like a price worth paying for supporting a standard API
and error handling

- Unused APIs
This is a downside of using Future, where the cancel() operation would
always return false and mean nothing. But we can mention that caveat in our
Java docs.

To summarize, I would prefer to expose a well understood and widely adopted
Java API and put up with the overhead of catching one unnecessary checked
exception, rather than wrap the useful ProduceRequestResult in a custom
async object (RecordSend) and explain that to our many users.

Thanks,
Neha




On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps  wrote:

> Hey Neha,
>
> Can you elaborate on why you prefer using Java's Future? The downside in my
> mind is the use of the checked InterruptedException and ExecutionException.
> ExecutionException is arguable, but forcing you to catch
> InterruptedException, often in code that can't be interrupted, seems
> perverse. It also leaves us with the cancel() method which I don't think we
> really can implement.
>
> Option 1A, to recap/elaborate, was the following. There is no Serializer or
> Partitioner api. We take a byte[] key and value and an optional integer
> partition. If you specify the integer partition it will be used. If you do
> not specify a key or a partition the partition will be chosen in a round
> robin fashion. If you specify a key but no partition we will chose a
> partition based on a hash of the key. In order to let the user find the
> partition we will need to given them access to the Cluster instance
> directly from the producer.
>
> -Jay
>
>
> On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede  >wrote:
>
> > Here are more thoughts on the public APIs -
> >
> > - I suggest we use java's Future instead of custom Future especially
> since
> > it is part of the public API
> >
> > - Serialization: I like the simplicity of the producer APIs with the
> > absence of serialization where we just deal with byte arrays for keys and
> > values. What I don't like about this is the performance overhead on the
> > Partitioner for any kind of custom partitioning based on the
> partitionKey.
> > Since the only purpose of partitionKey is to do custom partitioning, why
> > can't we take it in directly as an integer and let the user figure out
> the
> > mapping from partition_key -> partition_id using the getCluster() API?
> If I
> > understand correctly, this is similar to what you suggested as part of
> > option 1A. I like this approach since it maintains the simplicity of APIs
> > by allowing us to deal with bytes and does not compromise performance in
> > the custom partitioning case.
> >
> > Thanks,
> > Neha
> >
> >
> >
> > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps  wrote:
> >
> > > Hey Tom,
> > >
> > > That sounds cool. How did you end up handling parallel I/O if you wrap
> > the
> > > individual connections? Don't you need some selector that selects over
> > all
> > > the connections?
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown 
> wrote:
> > >
> > > > I implemented a 0.7 client in pure java, and its API very closely
> > > resembled
> > > > this. (When multiple people independently engineer the same solution,
> > > it's
> > > > probably good... right?). However, there were a few architectural
> > > > differences with my client:
> > > >
> > > > 1. The basic client itself was just an asynchronous layer around the
> > > > different server functions. In and of itself it had no knowledge of
> > > > partitions, only servers (and maintained TCP connections to them).
> > > >
> > > > 2. The main producer was an additional layer that provided a
> high-level
> > > > interface that could batch individual messages based on partition.
> > > >
> > > > 3. Knowledge of partitioning was done via an interface so that
> > different
> > > > strategies could be used.
> > > >
> > > > 4. Partitioning was done by the user, with knowledge of the available
> > > > partitions provided by #3.
> > > >
> > > > 5. Serialization was 

Re: New Producer Public API

2014-01-29 Thread Jay Kreps
Hey Tom,

So is there one connection and I/O thread per broker and a low-level client
for each of those, and then you hash into that to partition? Is it possible
to batch across partitions or only within a partition?

-Jay


On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown  wrote:

> Jay,
>
> There is both a basic client object, and a number of IO processing threads.
> The client object manages connections, creating new ones when new machines
> are connected, or when existing connections die. It also manages a queue of
> requests for each server. The IO processing thread has a selector, and
> performs the work of sending/receiving (removing items from the queue,
> interpreting the response at a basic level, etc). Since asynchronous
> sockets by nature decouple sending and receiving, request pipelining is
> inherent.
>
> Using the basic client, you can send individual produce requests (singular
> or batched). The "producer" layer adds an additional queue for each
> partition, allowing individual messages to be batched together.
>
> --Tom
>
>
>
> On Tue, Jan 28, 2014 at 9:10 PM, Jay Kreps  wrote:
>
> > Hey Neha,
> >
> > Can you elaborate on why you prefer using Java's Future? The downside in
> my
> > mind is the use of the checked InterruptedException and
> ExecutionException.
> > ExecutionException is arguable, but forcing you to catch
> > InterruptedException, often in code that can't be interrupted, seems
> > perverse. It also leaves us with the cancel() method which I don't think
> we
> > really can implement.
> >
> > Option 1A, to recap/elaborate, was the following. There is no Serializer
> or
> > Partitioner api. We take a byte[] key and value and an optional integer
> > partition. If you specify the integer partition it will be used. If you
> do
> > not specify a key or a partition the partition will be chosen in a round
> > robin fashion. If you specify a key but no partition we will chose a
> > partition based on a hash of the key. In order to let the user find the
> > partition we will need to given them access to the Cluster instance
> > directly from the producer.
> >
> > -Jay
> >
> >
> > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede  > >wrote:
> >
> > > Here are more thoughts on the public APIs -
> > >
> > > - I suggest we use java's Future instead of custom Future especially
> > since
> > > it is part of the public API
> > >
> > > - Serialization: I like the simplicity of the producer APIs with the
> > > absence of serialization where we just deal with byte arrays for keys
> and
> > > values. What I don't like about this is the performance overhead on the
> > > Partitioner for any kind of custom partitioning based on the
> > partitionKey.
> > > Since the only purpose of partitionKey is to do custom partitioning,
> why
> > > can't we take it in directly as an integer and let the user figure out
> > the
> > > mapping from partition_key -> partition_id using the getCluster() API?
> > If I
> > > understand correctly, this is similar to what you suggested as part of
> > > option 1A. I like this approach since it maintains the simplicity of
> APIs
> > > by allowing us to deal with bytes and does not compromise performance
> in
> > > the custom partitioning case.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > >
> > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > That sounds cool. How did you end up handling parallel I/O if you
> wrap
> > > the
> > > > individual connections? Don't you need some selector that selects
> over
> > > all
> > > > the connections?
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown 
> > wrote:
> > > >
> > > > > I implemented a 0.7 client in pure java, and its API very closely
> > > > resembled
> > > > > this. (When multiple people independently engineer the same
> solution,
> > > > it's
> > > > > probably good... right?). However, there were a few architectural
> > > > > differences with my client:
> > > > >
> > > > > 1. The basic client itself was just an asynchronous layer around
> the
> > > > > different server functions. In and of itself it had no knowledge of
> > > > > partitions, only servers (and maintained TCP connections to them).
> > > > >
> > > > > 2. The main producer was an additional layer that provided a
> > high-level
> > > > > interface that could batch individual messages based on partition.
> > > > >
> > > > > 3. Knowledge of partitioning was done via an interface so that
> > > different
> > > > > strategies could be used.
> > > > >
> > > > > 4. Partitioning was done by the user, with knowledge of the
> available
> > > > > partitions provided by #3.
> > > > >
> > > > > 5. Serialization was done by the user to simplify the API.
> > > > >
> > > > > 6. Futures were used to make asynchronous emulate synchronous
> calls.
> > > > >
> > > > >
> > > > > The main benefit of this approach is flexibility. For example,
> since
> > > the
> > > > > base client was just a managed connection (and

Re: New Producer Public API

2014-01-29 Thread Tom Brown
Jay,

There is both a basic client object, and a number of IO processing threads.
The client object manages connections, creating new ones when new machines
are connected, or when existing connections die. It also manages a queue of
requests for each server. The IO processing thread has a selector, and
performs the work of sending/receiving (removing items from the queue,
interpreting the response at a basic level, etc). Since asynchronous
sockets by nature decouple sending and receiving, request pipelining is
inherent.

Using the basic client, you can send individual produce requests (singular
or batched). The "producer" layer adds an additional queue for each
partition, allowing individual messages to be batched together.

--Tom



On Tue, Jan 28, 2014 at 9:10 PM, Jay Kreps  wrote:

> Hey Neha,
>
> Can you elaborate on why you prefer using Java's Future? The downside in my
> mind is the use of the checked InterruptedException and ExecutionException.
> ExecutionException is arguable, but forcing you to catch
> InterruptedException, often in code that can't be interrupted, seems
> perverse. It also leaves us with the cancel() method which I don't think we
> really can implement.
>
> Option 1A, to recap/elaborate, was the following. There is no Serializer or
> Partitioner api. We take a byte[] key and value and an optional integer
> partition. If you specify the integer partition it will be used. If you do
> not specify a key or a partition the partition will be chosen in a round
> robin fashion. If you specify a key but no partition we will chose a
> partition based on a hash of the key. In order to let the user find the
> partition we will need to given them access to the Cluster instance
> directly from the producer.
>
> -Jay
>
>
> On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede  >wrote:
>
> > Here are more thoughts on the public APIs -
> >
> > - I suggest we use java's Future instead of custom Future especially
> since
> > it is part of the public API
> >
> > - Serialization: I like the simplicity of the producer APIs with the
> > absence of serialization where we just deal with byte arrays for keys and
> > values. What I don't like about this is the performance overhead on the
> > Partitioner for any kind of custom partitioning based on the
> partitionKey.
> > Since the only purpose of partitionKey is to do custom partitioning, why
> > can't we take it in directly as an integer and let the user figure out
> the
> > mapping from partition_key -> partition_id using the getCluster() API?
> If I
> > understand correctly, this is similar to what you suggested as part of
> > option 1A. I like this approach since it maintains the simplicity of APIs
> > by allowing us to deal with bytes and does not compromise performance in
> > the custom partitioning case.
> >
> > Thanks,
> > Neha
> >
> >
> >
> > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps  wrote:
> >
> > > Hey Tom,
> > >
> > > That sounds cool. How did you end up handling parallel I/O if you wrap
> > the
> > > individual connections? Don't you need some selector that selects over
> > all
> > > the connections?
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown 
> wrote:
> > >
> > > > I implemented a 0.7 client in pure java, and its API very closely
> > > resembled
> > > > this. (When multiple people independently engineer the same solution,
> > > it's
> > > > probably good... right?). However, there were a few architectural
> > > > differences with my client:
> > > >
> > > > 1. The basic client itself was just an asynchronous layer around the
> > > > different server functions. In and of itself it had no knowledge of
> > > > partitions, only servers (and maintained TCP connections to them).
> > > >
> > > > 2. The main producer was an additional layer that provided a
> high-level
> > > > interface that could batch individual messages based on partition.
> > > >
> > > > 3. Knowledge of partitioning was done via an interface so that
> > different
> > > > strategies could be used.
> > > >
> > > > 4. Partitioning was done by the user, with knowledge of the available
> > > > partitions provided by #3.
> > > >
> > > > 5. Serialization was done by the user to simplify the API.
> > > >
> > > > 6. Futures were used to make asynchronous emulate synchronous calls.
> > > >
> > > >
> > > > The main benefit of this approach is flexibility. For example, since
> > the
> > > > base client was just a managed connection (and not inherently a
> > > producer),
> > > > it was easy to composite a produce request and an offsets request
> > > together
> > > > into a confirmed produce request (officially not available in 0.7).
> > > >
> > > > Decoupling the basic client from partition management allowed the me
> to
> > > > implement zk discovery as a separate project so that the main project
> > had
> > > > no complex dependencies. The same was true of decoupling
> serialization.
> > > > It's trivial to build an optional layer that adds those features in,
> >

Re: Kafka 0.7: producer request commit ACK

2014-01-29 Thread Jun Rao
No, only 0.8 has ack for the producers.

Thanks,

Jun


On Wed, Jan 29, 2014 at 6:54 AM, Janos Mucza  wrote:

> Dear Kafka Users,
>
> Is there any possibility for a producer to request commit ACK using Kafka
> 0.7?
>
> The reason I'm considering Kafka 0.7 is integration with existing .Net
> application(s).
> So far I didn't find any Kafka 0.8 .Net client that works properly.
>
> Thank you very much.
>
> Best regards,
> Janos
>
>


Re: hello, I'm user in kafka having a question about 'commit'

2014-01-29 Thread Jun Rao
If you are using the high level consumer, there is an commitOffsets() api.
If you are using SimpleConsumer, you are on your own for offset management.

The getOffsetBefore api is not for getting the consumer offsets, but for
getting the offset in the log before a particular time.

Thanks,

Jun


On Tue, Jan 28, 2014 at 10:51 PM, ê¹ EURO ì"¸ê³¤  wrote:

>
> hello, I'm user in kafka
>
> I want find commit method from consumer to zookeeper, but can't find it
> I want to make structure like that,
>
> hadoop-consumer: 1. it get message
> 2. it write message to hadoop hdfs
> 3. it have to get message again from what it read ,when it is fault and
> recover.
> ( for example,  consumer was reading message
> suddenly it was fault. during fault time, message is producing.
> after recovering, consumer have to read message from what it didn't read
> offset, not from first)
>
>
> problem is 3.
>
> I read about offset, written that
> after consumer read message, it commit to zookeeper. using that offset (
> ex, getbeforeoffset() ), can read message again.
> But, using this method offer that reading from first(offset-1) to last.
> And if use not it, consumer can't get messages produced during fault
>
> So, I want find any source or method to satisfy my structure, is there any
> thing to help me?
>
>
> thank you for reading
> please give some help
>
>
>


Re: Reg Exception in Kafka

2014-01-29 Thread Jun Rao
Hmm, it's weird that EC2 only allows you to bind to local ip. Could some
EC2 users here help out?

Also, we recently added https://issues.apache.org/jira/browse/KAFKA-1092,
which allows one to use a different ip for binding and connecting. You can
see if this works for you. The patch is only in trunk though.

Thanks,

Jun


On Tue, Jan 28, 2014 at 10:10 PM, Balasubramanian Jayaraman (Contingent) <
balasubramanian.jayara...@autodesk.com> wrote:

> I don't think so. I forgot to include the ifconfig  output. Actually the
> public IP is not one of the IP configured in the Ethernet interfaces.
> Only the Local IP is configured in eth0.
> Is there any solution to this?
>
> Ifconfig O/P:
>
> eth0  Link encap:Ethernet  HWaddr 22:00:0A:C7:1F:57
>   inet addr:10.X.X.X  Bcast:10.199.31.127  Mask:255.255.255.192
>   inet6 addr: fe80::2000:aff:fec7:1f57/64 Scope:Link
>   UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
>   RX packets:83186 errors:0 dropped:0 overruns:0 frame:0
>   TX packets:91285 errors:0 dropped:0 overruns:0 carrier:0
>   collisions:0 txqueuelen:1000
>   RX bytes:40233350 (38.3 MiB)  TX bytes:15089154 (14.3 MiB)
>   Interrupt:25
>
> loLink encap:Local Loopback
>   inet addr:127.0.0.1  Mask:255.0.0.0
>   inet6 addr: ::1/128 Scope:Host
>   UP LOOPBACK RUNNING  MTU:16436  Metric:1
>   RX packets:1379711 errors:0 dropped:0 overruns:0 frame:0
>   TX packets:1379711 errors:0 dropped:0 overruns:0 carrier:0
>   collisions:0 txqueuelen:0
>   RX bytes:109133672 (104.0 MiB)  TX bytes:109133672 (104.0 MiB)
>
> Thanks
> Bala
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Wednesday, January 29, 2014 12:27 PM
> To: users@kafka.apache.org
> Subject: Re: Reg Exception in Kafka
>
> Could it be a port conflict?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 28, 2014 at 5:20 PM, Balasubramanian Jayaraman (Contingent) <
> balasubramanian.jayara...@autodesk.com> wrote:
>
> > Jun,
> >
> > Thanks for your help.
> > I get the following exception :
> > kafka.common.KafkaException: Socket server failed to bind to
> > 54.241.44.129:9092: Cannot assign requested address.
> > at
> kafka.network.Acceptor.openServerSocket(SocketServer.scala:188)
> > at kafka.network.Acceptor.(SocketServer.scala:134)
> > at kafka.network.SocketServer.startup(SocketServer.scala:61)
> > at kafka.server.KafkaServer.startup(KafkaServer.scala:77)
> > at
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > at kafka.Kafka$.main(Kafka.scala:46)
> > at kafka.Kafka.main(Kafka.scala) Caused by:
> > java.net.BindException: Cannot assign requested address
> > at sun.nio.ch.Net.bind0(Native Method)
> > at sun.nio.ch.Net.bind(Net.java:174)
> > at
> > sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
> > at
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)
> > at
> sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:70)
> > at
> kafka.network.Acceptor.openServerSocket(SocketServer.scala:184)
> > ... 6 more
> >
> > The entire stack trace of the logs are placed below.
> >
> > [2014-01-29 01:18:23,136] INFO Verifying properties
> > (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,176] INFO Property host.name is overridden to
> > 54.241.44.129 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,177] INFO Property port is overridden to 9092
> > (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,177] INFO Property socket.request.max.bytes is
> > overridden to 104857600 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,177] INFO Property num.io.threads is overridden
> > to 2
> > (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,178] INFO Property log.dirs is overridden to
> > /tmp/kafka-logs-1 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,178] INFO Property log.cleanup.interval.mins is
> > overridden to 1 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,178] INFO Property socket.send.buffer.bytes is
> > overridden to 1048576 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,179] INFO Property log.flush.interval.ms is
> > overridden to 1000 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,179] INFO Property zookeeper.connect is
> > overridden to
> > localhost:2181 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,180] INFO Property broker.id is overridden to 1
> > (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,180] INFO Property log.retention.hours is
> > overridden to 168 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,180] INFO Property num.network.threads is
> > overridden to 2 (kafka.utils.VerifiableProperties)
> > [2014-01-29 01:18:23,180] INFO Property socket.receive.buffer.bytes is
> > overridden 

hello, I'm user in kafka having a question about 'commit'

2014-01-29 Thread 김세곤
 
hello, I'm user in kafka
 
I want find commit method from consumer to zookeeper, but can't find it
I want to make structure like that,
 
hadoop-consumer: 1. it get message
2. it write message to hadoop hdfs
3. it have to get message again from what it read ,when it is fault and recover.
( for example,  consumer was reading message
suddenly it was fault. during fault time, message is producing.
after recovering, consumer have to read message from what it didn't read 
offset, not from first)
 
 
problem is 3.
 
I read about offset, written that
after consumer read message, it commit to zookeeper. using that offset ( ex, 
getbeforeoffset() ), can read message again.
But, using this method offer that reading from first(offset-1) to last. And if 
use not it, consumer can't get messages produced during fault
 
So, I want find any source or method to satisfy my structure, is there any 
thing to help me?
 
 
thank you for reading
please give some help 
 
 


Kafka 0.7: producer request commit ACK

2014-01-29 Thread Janos Mucza
Dear Kafka Users,

Is there any possibility for a producer to request commit ACK using Kafka 0.7?

The reason I'm considering Kafka 0.7 is integration with existing .Net 
application(s).
So far I didn't find any Kafka 0.8 .Net client that works properly.

Thank you very much.

Best regards,
Janos



Re: kafka.common.LeaderNotAvailableException

2014-01-29 Thread Gerrit Jansen van Vuuren
I've found the response to my own question:
http://mail-archives.apache.org/mod_mbox/kafka-users/201308.mbox/%3c44d1e1522419a14482f89ff4ce322ede25025...@brn1wnexmbx01.vcorp.ad.vrsn.com%3E



On Wed, Jan 29, 2014 at 1:17 PM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> I'm testing kafka 0.8.0 failover.
>
> I have 5 brokers 1,2,3,4,5. I shutdown 5 (with controlled shutdown
> activated).
> broker 4 is my bootstrap broker.
>
> My config has: default.replication.factor=2, num.partitions=8.
>
> When I look at the kafka server.log on broker 4 I get the below error,
> which only goes away when I restart broker 5.
>
>
> [2014-01-29 04:12:15,348] ERROR [KafkaApi-4] Error while fetching metadata
> for partition [data,4] (kafka.server.KafkaApis)
> kafka.common.LeaderNotAvailableException: Leader not available for
> partition [data,4]
>  at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
> at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.List.map(List.scala:45)
>  at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
> at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
>  at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>  at scala.collection.immutable.HashSet.map(HashSet.scala:32)
> at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>  at java.lang.Thread.run(Thread.java:724)
>
>
> any ideas?
>


Re: custom kafka consumer - strangeness

2014-01-29 Thread Gerrit Jansen van Vuuren
Hi,

I've finally fixed this by closing the connection on timeout and creating a
new connection on the next send.

Thanks,
 Gerrit


On Tue, Jan 14, 2014 at 10:20 AM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> thanks I will do this.
>
>
>
> On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein  wrote:
>
>> I Gerrit, do you have a ticket already for this issue?  Is it possible to
>> attach code that reproduces it?  Would be great if you can run it against
>> a
>> Kafka VM you can grab one from this project for 0.8.0
>> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
>> whatever you need to it to reproduce the issue or from
>> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if
>> you
>> can reproduce it in an environment comfortably that is in a controlled
>> isolation that would be helpful for folks to reproduce and work towards
>> resolution At least if it is a bug we can get a detailed capture of
>> what the bug is in the JIRA ticket and start discussing how to fix it.
>>
>> /***
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop 
>> /
>>
>>
>> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
>> gerrit...@gmail.com> wrote:
>>
>> > Yes, I'm using my own client following:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> >
>> > Everything works except for this weirdness.
>> >
>> >
>> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao  wrote:
>> >
>> > > So, you implemented your own consumer client using netty?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
>> > > gerrit...@gmail.com> wrote:
>> > >
>> > > > I'm using netty and async write, read.
>> > > > For read I used a timeout such that if I do not see anything on the
>> > read
>> > > > channel, my read function times out and returns null.
>> > > > I do not see any error on the socket, and the same socket is used
>> > > > throughout all of the fetches.
>> > > >
>> > > > I'm using the console producer and messages are "11", "22", "abc",
>> > ""
>> > > > etc.
>> > > >
>> > > > I can reliably reproduce it every time.
>> > > >
>> > > > Its weird yes, no compression is used, the timeout happens for the
>> same
>> > > > scenario every time.
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao  wrote:
>> > > >
>> > > > > I can't seen to find the log trace for the timed out fetch request
>> > > (every
>> > > > > fetch request seems to have a corresponding completed entry). For
>> the
>> > > > timed
>> > > > > out fetch request, is it that the broker never completed the
>> request
>> > or
>> > > > is
>> > > > > it that it just took longer than the socket timeout to finish
>> > > processing
>> > > > > the request? Do you use large messages in your test?
>> > > > >
>> > > > > If you haven't enabled compression, it's weird that you will
>> re-get
>> > 240
>> > > > and
>> > > > > 241 with an offset of 242 in the fetch request. Is that easily
>> > > > > reproducible?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
>> > > > > gerrit...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > the offset in g is 240, and in i 242, the last message read was
>> at
>> > > > offset
>> > > > > > 239.
>> > > > > >
>> > > > > > After reading from 0 - 239, I make another request for 240, this
>> > > > request
>> > > > > > timesout and never returns.
>> > > > > > I then manually add 2 entries via the console producer, all the
>> > time
>> > > > > while
>> > > > > > making a request for 240 every 10 seconds, all subsequent
>> requests
>> > > for
>> > > > > > offset 240 returns empty messages, till the responses are
>> written.
>> > > > Then I
>> > > > > > get the 2 messages at offsets 240,241 and an end of response.
>> Then
>> > I
>> > > > > make a
>> > > > > > request for offset 242, and get the messages at offsets 240,241
>> > > again.
>> > > > > >
>> > > > > > I've attached a portion of the kafka-request.log set to trace.
>> > > > > >
>> > > > > > The correlation ids are:
>> > > > > > 1389604489 - first request at offset 0
>> > > > > > 1389604511  - timeout at offset 240
>> > > > > > 1389604563  - got data request at offset 240
>> > > > > > 1389604573  - got duplicates request at offset 242
>> > > > > >
>> > > > > > Regards,
>> > > > > >  Gerrit
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao 
>> wrote:
>> > > > > >
>> > > > > >> What's the offset used in the fetch request in steps g and i
>> that
>> > > both
>> > > > > >> returned offsets 10 and 11?
>> > > > > >>
>> > > > > >> Thanks,
>> > 

kafka.common.LeaderNotAvailableException

2014-01-29 Thread Gerrit Jansen van Vuuren
Hi,

I'm testing kafka 0.8.0 failover.

I have 5 brokers 1,2,3,4,5. I shutdown 5 (with controlled shutdown
activated).
broker 4 is my bootstrap broker.

My config has: default.replication.factor=2, num.partitions=8.

When I look at the kafka server.log on broker 4 I get the below error,
which only goes away when I restart broker 5.


[2014-01-29 04:12:15,348] ERROR [KafkaApi-4] Error while fetching metadata
for partition [data,4] (kafka.server.KafkaApis)
kafka.common.LeaderNotAvailableException: Leader not available for
partition [data,4]
at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.List.map(List.scala:45)
at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.HashSet.map(HashSet.scala:32)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)


any ideas?