Guozhang,

Correct.  Even with linger.ms at 1000 ms, the metrics indicate no batching.

The KafkaProducer instance is recognizing the linger.ms setting since sending 
100 records 
with linger.ms=1000 then takes over 100 seconds.

Here is the test harness.  There is an abstract parent class with methods to 
parse the Bluemix 
Message Hub (Kafka 0.9) configuration environment variable and print the 
metrics.

/**
 * MessageHub Producer Client
 * Kafka 0.9
 */
package com.isllc.client.producer;

import java.io.UnsupportedEncodingException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

import com.isllc.client.AbstractClient;

/**
 * ExploreProducer to send messages
 * 
 */
public class ExploreProducer extends AbstractClient {
        public static Logger logger = 
Logger.getLogger(ExploreProducer.class.getName());
        final static String FIELD_NAME = "records"; // Specified for MessageHub
        KafkaProducer<byte[], byte[]> kafkaProducer;

        public ExploreProducer(String propertiesName) {
                Properties clientProperties = getProperties(propertiesName);
                clientProperties.put("linger.ms", "100");
                kafkaProducer = new KafkaProducer<byte[], 
byte[]>(clientProperties);
        }

        public void send(String topic, String message) {
                ProducerRecord<byte[], byte[]> record;

                try {
                        record = new ProducerRecord<byte[], byte[]>(topic, 
FIELD_NAME.getBytes(UTF8), message.getBytes(UTF8));
                } catch (UnsupportedEncodingException uee) {
                        throw new RuntimeException(uee.getMessage(), uee);
                }
                RecordMetadata metadata;
                try {
                        if (logger.isTraceEnabled())
                                logger.trace("Sending record: Topic='" + 
record.topic() + "', Key='" + new String(record.key())
                                                + "', Value='" + new 
String(record.value()) + "'");
                        metadata = kafkaProducer.send(record).get();
                } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(ie.getMessage(), ie);
                } catch (ExecutionException ee) {
                        ;
                        throw new RuntimeException(ee.getMessage(), ee);
                }
                if (logger.isTraceEnabled())
                        logger.trace("Send returned metadata: Topic='" + 
metadata.topic() + "', Partition=" + metadata.partition()
                                        + ", Offset=" + metadata.offset());
        }

        public void close() {
                kafkaProducer.close();
        }

        public void exercise(int sends) {
                long bytes = 0l;
                logger.info("************************************************");
                logger.info("Starting " + sends + " sends including the 
following first and last records:");

                long start = System.currentTimeMillis();
                for (int i = 1; i <= sends; i++) {
                        String iso = 
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
                        String message = String.format("Kafka 0.9 Java Client 
Record Test Message %05d %s", i, iso);
                        if (i == 1) {
                                logger.info("Record length: " + 
message.length() + " bytes");
                                logger.info("First: " + message);
                        }
                        if (i == sends)
                                logger.info("Last:  " + message);
                        bytes += message.length();
                        send("mytopic", message);
                }
                ;
                kafkaProducer.flush();
                long finish = System.currentTimeMillis();

                logger.info("Duration for " + sends + " sends " + (finish - 
start) + " ms. Sent " + bytes + " bytes.");

                logMetrics(kafkaProducer.metrics());
        }

        /**
         * Main method for exploration
         * 
         * @param args
         */
        public static void main(String[] args) {
                logger.info("Instantiating producer 
com.isllc.client.producer.ExploreProducer");
                ExploreProducer producer = new 
ExploreProducer("kafka-producer.properties");
                try {
                        producer.exercise(100); // Initialize component
                        // producer.exercise(1000);
                        // producer.exercise(10000);
                        // producer.exercise(100000);
                } catch (RuntimeException rte) {
                        logger.fatal("Terminating producer client with 
RuntimeException: " + rte.getMessage(), rte);
                        System.exit(1);
                } finally {
                        if (producer != null)
                                producer.close();
                }
                logger.info("Terminating producer client normally");
        }
}


It’s really pretty simple — but something’s not right - :)

Gary

> On Dec 13, 2015, at 12:18 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Gary,
> 
> So you observed records-per-request-avg = 1.0 still with linger.ms = 100 or
> 1000?
> 
> It seems you are not use Kafka's ProducerPerformance but are using
> your own ExploreProducer
> implementation, could you elaborate a bit about how messages are piped to
> this client?
> 
> Guozhang
> 
> 
> On Sat, Dec 12, 2015 at 11:34 AM, Gary Gershon <g...@intermediasciences.com>
> wrote:
> 
<snipped>

> -- Guozhang

Reply via email to