I have published variation of this question before regarding 'old' publisher. I
figured out how to use 'new' redesigned publisher and I have similar question.
Basically, I want
#1) Detect when broker is down when using asynchronous publisher. If broker is
down, I want to generate alert and stop publication.
#2) Most of all, I want to avoid message gaps when broker recovers - in other
words (I do not want to loose messages when broker is down and silently resume
publication when broker is up again).
Apparently, this is not possible with old publisher when asynchronous mode mode
is used.
My test program is below. I do not see the way to detect when broker is down. I
see errors published in the log, but my publisher keeps publishing without
reporting any errors via the API. Once broker connection is up again, it will
report exceptions for the messages that it failed to publish in the Callback.
However, I suspect it would also publish new messages at the point when
exception is reported, creating a 'gap'.
I there a way to address these concerns?
I would hate to use synch publisher - my publishing rate drops dramatically.
From 300,000 m/s to 7,000 m/s.
package com.kcg.kafka.test;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class TestProducer {
public static void main(String[] args) {
System.setProperty("log4j.debug", "true");
long events = Long.parseLong(args[0]);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"cno-d-igoberman2:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer<String,String> producer = new
KafkaProducer<String,String>(props);
long start = System.currentTimeMillis();
long nEvents = 0;
for (; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
final String key = Long.toString(nEvents);
String msg = runtime + ": " + key + new String(new byte[300]);
ProducerRecord<String,String> producerRecord = new
ProducerRecord<String,String>("test", key, msg);
try {
System.out.println("Sending " + key);
// 'send' will not throw an exception when broker is down.
producer.send(producerRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata,
Exception e) {
if(e != null)
{
e.printStackTrace();
// Stop publisher. But is there guarantee
that no more messages are published after this one?
}
else
System.out.println("The offset of the
record we just sent is: " + metadata.offset() + " " + key);
}
});//.get(20, TimeUnit.SECONDS);
}
catch (Throwable e) {
e.printStackTrace();
}
if (nEvents % 10000 == 0) {
System.out.println("" + key);
}
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
}
long duration = (System.currentTimeMillis() - start);
System.out.println("Published " + nEvents + " messages in " + duration
+ "ms. " + (int)((double)nEvents/((double)duration / 1000.0)) + " m/s.");
producer.close();
}
}
log4j: Trying to find [log4j.xml] using context classloader
sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Trying to find [log4j.xml] using
sun.misc.Launcher$AppClassLoader@6da21389 class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader
sun.misc.Launcher$AppClassLoader@6da21389.
log4j: Using URL
[file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties] for automatic
log4j configuration.
log4j: Reading configuration from URL
file:/home/apprun/kafkatest/TestProducer/bin/log4j.properties
log4j: Parsing for [root] with value=[INFO, stdout].
log4j: Level token is [INFO].
log4j: Category root set to INFO
log4j: Parsing appender named "stdout".
log4j: Parsing layout options for "stdout".
log4j: Setting property [conversionPattern] to [%d{yyyy-MM-dd HH:mm:ss} %-5p
%c{1}:%L - %m%n].
log4j: End of parsing for "stdout".
log4j: Setting property [target] to [System.out].
log4j: Parsed "stdout" options.
log4j: Finished configuring.
2015-11-10 10:23:16 INFO ProducerConfig:113 - ProducerConfig values:
value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 33554432
batch.size = 16384
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
timeout.ms = 30000
max.in.flight.requests.per.connection = 5
bootstrap.servers = [cno-d-igoberman2:9092]
metric.reporters = []
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = 1
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 60000
Sending 0
0
The offset of the record we just sent is: 12 0
Sending 1
The offset of the record we just sent is: 13 1
Sending 2
The offset of the record we just sent is: 14 2
Sending 3
The offset of the record we just sent is: 15 3
Sending 4
The offset of the record we just sent is: 16 4
Sending 5
The offset of the record we just sent is: 17 5
Sending 6
The offset of the record we just sent is: 18 6
Sending 7
The offset of the record we just sent is: 19 7
Sending 8
The offset of the record we just sent is: 20 8
Sending 9
The offset of the record we just sent is: 21 9
2015-11-10 10:23:25 WARN Selector:276 - Error in I/O with
cno-d-igoberman2/10.83.55.13
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
....
Sending 10
This e-mail and its attachments are intended only for the individual or entity
to whom it is addressed and may contain information that is confidential,
privileged, inside information, or subject to other restrictions on use or
disclosure. Any unauthorized use, dissemination or copying of this transmission
or the information in it is prohibited and may be unlawful. If you have
received this transmission in error, please notify the sender immediately by
return e-mail, and permanently delete or destroy this e-mail, any attachments,
and all copies (digital or paper). Unless expressly stated in this e-mail,
nothing in this message should be construed as a digital or electronic
signature. For additional important disclaimers and disclosures regarding KCG's
products and services, please click on the following link:
http://www.kcg.com/legal/global-disclosures