Re: Creating partition with many partitions

2015-12-13 Thread Ofir Manor
Hi,
I think this is the best reference on the actual trade-off (more/less
partitions):

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
As a guess, for a three-node Kafka cluster, I don't see how useful would be
to have 1024 consumers in a single consumer group, just for read
parallelism sake within that group.
Anyway, maybe test and measure the actual throughput you get on your setup
- or even better, set your throughput and availability goals first
(replication levels, throughput if a node fails etc) and then tune the
configuration accordingly (including number of partitions) to support it.
You may need to add disks or nodes to get there, depending on your actual
goals.

  Ofir


On Thu, Dec 10, 2015 at 5:29 PM, Balthasar Schopman <
b.schop...@tech.leaseweb.com> wrote:

> Hi,
>
> We are considering an architecture with a Kafka cluster of 3 nodes and a
> high number of consumers. We see that with a low number of partitions, e.g.
> 3, and a higher number of consumers, e.g. 16, there will be only 3
> consumers actually consuming data, because only the owners of partitions
> can consume messages. To see the owners we do the following:
>
> $ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper
> localhost:2181 --group consumer_group
> Group   Topic Pid Offset logSize Lag Owner
> consumer_group statistics 0   5335   537338
> consumer_group_balthasar-1449651803301-63a1d620-0
> consumer_group statistics 1   5335   537439
> consumer_group_balthasar-1449651803820-35a84426-0
> consumer_group statistics 2   5335   537439
> consumer_group_balthasar-1449651803934-2b3cc1bd-0
>
> One solution to being able to have many consumers is to increase the
> amount of partitions to a high number, e.g. 1024. This would put more load
> on the machines running Kafka, but would this load be crazy? The machines
> that'll be running Kafka have 64GB RAM and a Xeon E5-2620 CPU (6 cores
> clocked at 2GHz, 24 hardware threads in total).
>
> Are there any other reasons not to use such a high number of partitions?
>
> Kind regards,
> Balthasar Schopman
>
> Kind regards,
>
> Balthasar Schopman
> Software Developer
> LeaseWeb Technologies B.V.
>
> T: +31 20 316 0232
> M:
> E: b.schop...@tech.leaseweb.com
> W: http://www.leaseweb.com
>
> Luttenbergweg 8, 1101 EC Amsterdam, Netherlands
>
>
>


Re: Low-latency, high message size variance

2015-12-13 Thread Jens Rantil
Hi again,


For the record I filed an issue about this here: 
https://issues.apache.org/jira/browse/KAFKA-2986




Cheers,

Jens





–
Skickat från Mailbox

On Fri, Dec 11, 2015 at 7:56 PM, Jens Rantil  wrote:

> Hi,
> We've been experimenting a little with running Kafka internally for better
> handling temporary throughput peaks of asynchronous tasks. However, we've
> had a big issues making Kafka work for us and I am starting to question
> whether its a good fit.
> Our usecase:
>- High latency. At peaks, each consumer requires ~20 seconds to handle a
>single message/task.
>- Extreme variation in message size: Serialized tasks are in the range
>of ~300 bytes up to ~3 MB.
>- Generally, it is processed in 20 seconds independent of message size.
>Most messages are small.
> Our setup:
>- Kafka 0.9.0.
>- Using the new Java consumer API (consumer groups etc.).
>- To occasionally handle large (3 MB) messages we've had to set the
>following configuration parameters:
>   - max.partition.fetch.bytes=10485760=10MB on consumer to handle
>   larger messages.
>   - session.timeout.ms=30s to handle our high latency processing.
>   - replica.fetch.max.bytes=10485760=10MB on broker.
>   - message.max.bytes=10485760=10MB on broker.
> Sample code:
> while (isRunning()) {
>   ConsumerRecords records = consumer.poll(100);
>   for (final ConsumerRecord record : records) {
> // Handle record...
>   }
> }
> AFAIK this seem like a very basic consumer code.
> Initial problem: When doing load testing to simulate peaks our consumer
> started spinning infinitely in similar fashion to [1]. We also noticed that
> we consistently were seeing [2] in our broker log.
> [1] http://bit.ly/1Q7zxgh
> [2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b
> Root cause analysis: AFAIK, health checks are only submitted to Kafka when
> calling `consumer.poll(...)`. To handle larger messages, we needed to
> increase max.partition.fetch.bytes. However, due to our high latency
> consumer a large amounts of small messages could be prefetched which made
> our inner for loop run long enough for the broker to consider our consumer
> dead.
> Two questions:
>- Is there any workaround to avoid the broker thinking our consumer is
>dead? Increasing session timeout to handle the polling interval for small
>messages is not an option since we simply prefetch too many messages for
>that to be an option. Can we set a limit on how many messages Kafka
>prefetches? Or is there a way to send health checks to broker out of bands
>without invoking the `KafkaConsumer#poll` method?
>- Is Kafka a bad tool for our usecase?
> Thanks and have a nice weekend,
> Jens
> -- 
> Jens Rantil
> Backend engineer
> Tink AB
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
> Facebook  Linkedin
> 
>  Twitter 

Re: Kafka Producer 0.9 performance issue with small messages

2015-12-13 Thread Gary Gershon
Guozhang,

Correct.  Even with linger.ms at 1000 ms, the metrics indicate no batching.

The KafkaProducer instance is recognizing the linger.ms setting since sending 
100 records 
with linger.ms=1000 then takes over 100 seconds.

Here is the test harness.  There is an abstract parent class with methods to 
parse the Bluemix 
Message Hub (Kafka 0.9) configuration environment variable and print the 
metrics.

/**
 * MessageHub Producer Client
 * Kafka 0.9
 */
package com.isllc.client.producer;

import java.io.UnsupportedEncodingException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

import com.isllc.client.AbstractClient;

/**
 * ExploreProducer to send messages
 * 
 */
public class ExploreProducer extends AbstractClient {
public static Logger logger = 
Logger.getLogger(ExploreProducer.class.getName());
final static String FIELD_NAME = "records"; // Specified for MessageHub
KafkaProducer kafkaProducer;

public ExploreProducer(String propertiesName) {
Properties clientProperties = getProperties(propertiesName);
clientProperties.put("linger.ms", "100");
kafkaProducer = new KafkaProducer(clientProperties);
}

public void send(String topic, String message) {
ProducerRecord record;

try {
record = new ProducerRecord(topic, 
FIELD_NAME.getBytes(UTF8), message.getBytes(UTF8));
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException(uee.getMessage(), uee);
}
RecordMetadata metadata;
try {
if (logger.isTraceEnabled())
logger.trace("Sending record: Topic='" + 
record.topic() + "', Key='" + new String(record.key())
+ "', Value='" + new 
String(record.value()) + "'");
metadata = kafkaProducer.send(record).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie.getMessage(), ie);
} catch (ExecutionException ee) {
;
throw new RuntimeException(ee.getMessage(), ee);
}
if (logger.isTraceEnabled())
logger.trace("Send returned metadata: Topic='" + 
metadata.topic() + "', Partition=" + metadata.partition()
+ ", Offset=" + metadata.offset());
}

public void close() {
kafkaProducer.close();
}

public void exercise(int sends) {
long bytes = 0l;
logger.info("");
logger.info("Starting " + sends + " sends including the 
following first and last records:");

long start = System.currentTimeMillis();
for (int i = 1; i <= sends; i++) {
String iso = 
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
String message = String.format("Kafka 0.9 Java Client 
Record Test Message %05d %s", i, iso);
if (i == 1) {
logger.info("Record length: " + 
message.length() + " bytes");
logger.info("First: " + message);
}
if (i == sends)
logger.info("Last:  " + message);
bytes += message.length();
send("mytopic", message);
}
;
kafkaProducer.flush();
long finish = System.currentTimeMillis();

logger.info("Duration for " + sends + " sends " + (finish - 
start) + " ms. Sent " + bytes + " bytes.");

logMetrics(kafkaProducer.metrics());
}

/**
 * Main method for exploration
 * 
 * @param args
 */
public static void main(String[] args) {
logger.info("Instantiating producer 
com.isllc.client.producer.ExploreProducer");
ExploreProducer producer = new 
ExploreProducer("kafka-producer.properties");
try {
producer.exercise(100); // Initialize component
// producer.exercise(1000);
// producer.exercise(1);

Re: Kafka Producer 0.9 performance issue with small messages

2015-12-13 Thread Guozhang Wang
Gary,

You are calling "kafkaProducer.send(record).get();" for each message, the
get() call block until the Future is initialized, which effectively
synchronize all message sent by asking for the ACK for each message before
sending the next message, hence no batching.

You can try using "send(record, callback)" for async sending and let the
callback handle errors from the returned metadata.

Guozhang


On Sun, Dec 13, 2015 at 9:13 AM, Gary Gershon 
wrote:

> Guozhang,
>
> Correct.  Even with linger.ms at 1000 ms, the metrics indicate no
> batching.
>
> The KafkaProducer instance is recognizing the linger.ms setting since
> sending 100 records
> with linger.ms=1000 then takes over 100 seconds.
>
> Here is the test harness.  There is an abstract parent class with methods
> to parse the Bluemix
> Message Hub (Kafka 0.9) configuration environment variable and print the
> metrics.
>
> /**
>  * MessageHub Producer Client
>  * Kafka 0.9
>  */
> package com.isllc.client.producer;
>
> import java.io.UnsupportedEncodingException;
> import java.time.OffsetDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.log4j.Logger;
>
> import com.isllc.client.AbstractClient;
>
> /**
>  * ExploreProducer to send messages
>  *
>  */
> public class ExploreProducer extends AbstractClient {
> public static Logger logger =
> Logger.getLogger(ExploreProducer.class.getName());
> final static String FIELD_NAME = "records"; // Specified for
> MessageHub
> KafkaProducer kafkaProducer;
>
> public ExploreProducer(String propertiesName) {
> Properties clientProperties =
> getProperties(propertiesName);
> clientProperties.put("linger.ms", "100");
> kafkaProducer = new KafkaProducer byte[]>(clientProperties);
> }
>
> public void send(String topic, String message) {
> ProducerRecord record;
>
> try {
> record = new ProducerRecord(topic,
> FIELD_NAME.getBytes(UTF8), message.getBytes(UTF8));
> } catch (UnsupportedEncodingException uee) {
> throw new RuntimeException(uee.getMessage(), uee);
> }
> RecordMetadata metadata;
> try {
> if (logger.isTraceEnabled())
> logger.trace("Sending record: Topic='" +
> record.topic() + "', Key='" + new String(record.key())
> + "', Value='" + new
> String(record.value()) + "'");
> metadata = kafkaProducer.send(record).get();
> } catch (InterruptedException ie) {
> Thread.currentThread().interrupt();
> throw new RuntimeException(ie.getMessage(), ie);
> } catch (ExecutionException ee) {
> ;
> throw new RuntimeException(ee.getMessage(), ee);
> }
> if (logger.isTraceEnabled())
> logger.trace("Send returned metadata: Topic='" +
> metadata.topic() + "', Partition=" + metadata.partition()
> + ", Offset=" + metadata.offset());
> }
>
> public void close() {
> kafkaProducer.close();
> }
>
> public void exercise(int sends) {
> long bytes = 0l;
> logger.info
> ("");
> logger.info("Starting " + sends + " sends including the
> following first and last records:");
>
> long start = System.currentTimeMillis();
> for (int i = 1; i <= sends; i++) {
> String iso =
> OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
> String message = String.format("Kafka 0.9 Java
> Client Record Test Message %05d %s", i, iso);
> if (i == 1) {
> logger.info("Record length: " +
> message.length() + " bytes");
> logger.info("First: " + message);
> }
> if (i == sends)
> logger.info("Last:  " + message);
> bytes += message.length();
> send("mytopic", message);
> }
> ;
> kafkaProducer.flush();
> long finish = System.currentTimeMillis();
>
> logger.info("Duration for " + sends + " sends 

Re: Kafka Producer 0.9 performance issue with small messages

2015-12-13 Thread Gary Gershon
Guozhang,

Yes - you identified the problem!

We had inserted the .get() for debugging, but didn’t think of the (huge!) 
side-effects.

Using the async callback works perfectly well.

We are now able to send 100,000 records in 14 sec from a laptop to the Bluemix 
cloud - ~1000x faster,

Thank you very much!

Gary


> On Dec 13, 2015, at 2:48 PM, Guozhang Wang  wrote:
> 
> Gary,
> 
> You are calling "kafkaProducer.send(record).get();" for each message, the
> get() call block until the Future is initialized, which effectively
> synchronize all message sent by asking for the ACK for each message before
> sending the next message, hence no batching.
> 
> You can try using "send(record, callback)" for async sending and let the
> callback handle errors from the returned metadata.
> 
> Guozhang
> 
>