Guozhang,
Correct. Even with linger.ms at 1000 ms, the metrics indicate no batching.
The KafkaProducer instance is recognizing the linger.ms setting since sending
100 records
with linger.ms=1000 then takes over 100 seconds.
Here is the test harness. There is an abstract parent class with methods to
parse the Bluemix
Message Hub (Kafka 0.9) configuration environment variable and print the
metrics.
/**
* MessageHub Producer Client
* Kafka 0.9
*/
package com.isllc.client.producer;
import java.io.UnsupportedEncodingException;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import com.isllc.client.AbstractClient;
/**
* ExploreProducer to send messages
*
*/
public class ExploreProducer extends AbstractClient {
public static Logger logger =
Logger.getLogger(ExploreProducer.class.getName());
final static String FIELD_NAME = "records"; // Specified for MessageHub
KafkaProducer<byte[], byte[]> kafkaProducer;
public ExploreProducer(String propertiesName) {
Properties clientProperties = getProperties(propertiesName);
clientProperties.put("linger.ms", "100");
kafkaProducer = new KafkaProducer<byte[],
byte[]>(clientProperties);
}
public void send(String topic, String message) {
ProducerRecord<byte[], byte[]> record;
try {
record = new ProducerRecord<byte[], byte[]>(topic,
FIELD_NAME.getBytes(UTF8), message.getBytes(UTF8));
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException(uee.getMessage(), uee);
}
RecordMetadata metadata;
try {
if (logger.isTraceEnabled())
logger.trace("Sending record: Topic='" +
record.topic() + "', Key='" + new String(record.key())
+ "', Value='" + new
String(record.value()) + "'");
metadata = kafkaProducer.send(record).get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie.getMessage(), ie);
} catch (ExecutionException ee) {
;
throw new RuntimeException(ee.getMessage(), ee);
}
if (logger.isTraceEnabled())
logger.trace("Send returned metadata: Topic='" +
metadata.topic() + "', Partition=" + metadata.partition()
+ ", Offset=" + metadata.offset());
}
public void close() {
kafkaProducer.close();
}
public void exercise(int sends) {
long bytes = 0l;
logger.info("************************************************");
logger.info("Starting " + sends + " sends including the
following first and last records:");
long start = System.currentTimeMillis();
for (int i = 1; i <= sends; i++) {
String iso =
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
String message = String.format("Kafka 0.9 Java Client
Record Test Message %05d %s", i, iso);
if (i == 1) {
logger.info("Record length: " +
message.length() + " bytes");
logger.info("First: " + message);
}
if (i == sends)
logger.info("Last: " + message);
bytes += message.length();
send("mytopic", message);
}
;
kafkaProducer.flush();
long finish = System.currentTimeMillis();
logger.info("Duration for " + sends + " sends " + (finish -
start) + " ms. Sent " + bytes + " bytes.");
logMetrics(kafkaProducer.metrics());
}
/**
* Main method for exploration
*
* @param args
*/
public static void main(String[] args) {
logger.info("Instantiating producer
com.isllc.client.producer.ExploreProducer");
ExploreProducer producer = new
ExploreProducer("kafka-producer.properties");
try {
producer.exercise(100); // Initialize component
// producer.exercise(1000);
// producer.exercise(10000);
// producer.exercise(100000);
} catch (RuntimeException rte) {
logger.fatal("Terminating producer client with
RuntimeException: " + rte.getMessage(), rte);
System.exit(1);
} finally {
if (producer != null)
producer.close();
}
logger.info("Terminating producer client normally");
}
}
It’s really pretty simple — but something’s not right - :)
Gary
> On Dec 13, 2015, at 12:18 AM, Guozhang Wang <[email protected]> wrote:
>
> Gary,
>
> So you observed records-per-request-avg = 1.0 still with linger.ms = 100 or
> 1000?
>
> It seems you are not use Kafka's ProducerPerformance but are using
> your own ExploreProducer
> implementation, could you elaborate a bit about how messages are piped to
> this client?
>
> Guozhang
>
>
> On Sat, Dec 12, 2015 at 11:34 AM, Gary Gershon <[email protected]>
> wrote:
>
<snipped>
> -- Guozhang