Gary,

You are calling "kafkaProducer.send(record).get();" for each message, the
get() call block until the Future is initialized, which effectively
synchronize all message sent by asking for the ACK for each message before
sending the next message, hence no batching.

You can try using "send(record, callback)" for async sending and let the
callback handle errors from the returned metadata.

Guozhang


On Sun, Dec 13, 2015 at 9:13 AM, Gary Gershon <g...@intermediasciences.com>
wrote:

> Guozhang,
>
> Correct.  Even with linger.ms at 1000 ms, the metrics indicate no
> batching.
>
> The KafkaProducer instance is recognizing the linger.ms setting since
> sending 100 records
> with linger.ms=1000 then takes over 100 seconds.
>
> Here is the test harness.  There is an abstract parent class with methods
> to parse the Bluemix
> Message Hub (Kafka 0.9) configuration environment variable and print the
> metrics.
>
> /**
>  * MessageHub Producer Client
>  * Kafka 0.9
>  */
> package com.isllc.client.producer;
>
> import java.io.UnsupportedEncodingException;
> import java.time.OffsetDateTime;
> import java.time.format.DateTimeFormatter;
> import java.util.Properties;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.log4j.Logger;
>
> import com.isllc.client.AbstractClient;
>
> /**
>  * ExploreProducer to send messages
>  *
>  */
> public class ExploreProducer extends AbstractClient {
>         public static Logger logger =
> Logger.getLogger(ExploreProducer.class.getName());
>         final static String FIELD_NAME = "records"; // Specified for
> MessageHub
>         KafkaProducer<byte[], byte[]> kafkaProducer;
>
>         public ExploreProducer(String propertiesName) {
>                 Properties clientProperties =
> getProperties(propertiesName);
>                 clientProperties.put("linger.ms", "100");
>                 kafkaProducer = new KafkaProducer<byte[],
> byte[]>(clientProperties);
>         }
>
>         public void send(String topic, String message) {
>                 ProducerRecord<byte[], byte[]> record;
>
>                 try {
>                         record = new ProducerRecord<byte[], byte[]>(topic,
> FIELD_NAME.getBytes(UTF8), message.getBytes(UTF8));
>                 } catch (UnsupportedEncodingException uee) {
>                         throw new RuntimeException(uee.getMessage(), uee);
>                 }
>                 RecordMetadata metadata;
>                 try {
>                         if (logger.isTraceEnabled())
>                                 logger.trace("Sending record: Topic='" +
> record.topic() + "', Key='" + new String(record.key())
>                                                 + "', Value='" + new
> String(record.value()) + "'");
>                         metadata = kafkaProducer.send(record).get();
>                 } catch (InterruptedException ie) {
>                         Thread.currentThread().interrupt();
>                         throw new RuntimeException(ie.getMessage(), ie);
>                 } catch (ExecutionException ee) {
>                         ;
>                         throw new RuntimeException(ee.getMessage(), ee);
>                 }
>                 if (logger.isTraceEnabled())
>                         logger.trace("Send returned metadata: Topic='" +
> metadata.topic() + "', Partition=" + metadata.partition()
>                                         + ", Offset=" + metadata.offset());
>         }
>
>         public void close() {
>                 kafkaProducer.close();
>         }
>
>         public void exercise(int sends) {
>                 long bytes = 0l;
>                 logger.info
> ("************************************************");
>                 logger.info("Starting " + sends + " sends including the
> following first and last records:");
>
>                 long start = System.currentTimeMillis();
>                 for (int i = 1; i <= sends; i++) {
>                         String iso =
> OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
>                         String message = String.format("Kafka 0.9 Java
> Client Record Test Message %05d %s", i, iso);
>                         if (i == 1) {
>                                 logger.info("Record length: " +
> message.length() + " bytes");
>                                 logger.info("First: " + message);
>                         }
>                         if (i == sends)
>                                 logger.info("Last:  " + message);
>                         bytes += message.length();
>                         send("mytopic", message);
>                 }
>                 ;
>                 kafkaProducer.flush();
>                 long finish = System.currentTimeMillis();
>
>                 logger.info("Duration for " + sends + " sends " + (finish
> - start) + " ms. Sent " + bytes + " bytes.");
>
>                 logMetrics(kafkaProducer.metrics());
>         }
>
>         /**
>          * Main method for exploration
>          *
>          * @param args
>          */
>         public static void main(String[] args) {
>                 logger.info("Instantiating producer
> com.isllc.client.producer.ExploreProducer");
>                 ExploreProducer producer = new
> ExploreProducer("kafka-producer.properties");
>                 try {
>                         producer.exercise(100); // Initialize component
>                         // producer.exercise(1000);
>                         // producer.exercise(10000);
>                         // producer.exercise(100000);
>                 } catch (RuntimeException rte) {
>                         logger.fatal("Terminating producer client with
> RuntimeException: " + rte.getMessage(), rte);
>                         System.exit(1);
>                 } finally {
>                         if (producer != null)
>                                 producer.close();
>                 }
>                 logger.info("Terminating producer client normally");
>         }
> }
>
>
> It’s really pretty simple — but something’s not right - :)
>
> Gary
>
> > On Dec 13, 2015, at 12:18 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Gary,
> >
> > So you observed records-per-request-avg = 1.0 still with linger.ms =
> 100 or
> > 1000?
> >
> > It seems you are not use Kafka's ProducerPerformance but are using
> > your own ExploreProducer
> > implementation, could you elaborate a bit about how messages are piped to
> > this client?
> >
> > Guozhang
> >
> >
> > On Sat, Dec 12, 2015 at 11:34 AM, Gary Gershon <
> g...@intermediasciences.com>
> > wrote:
> >
> <snipped>
>
> > -- Guozhang
>
>


-- 
-- Guozhang

Reply via email to