Hello,
I have an issue. I hope you are able to resolve it.
I attached my test code.*
*

*Environment:*
3 node clustered kafka (2.12-2.1.1)
zookeeper (3.4)*
*

*Kafka topic description:*

Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs:min.insync.replicas=2

Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test_topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: test_topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 3,2
Topic: test_topic Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: test_topic Partition: 4 Leader: 1 Replicas: 1,3 Isr: 3,1

*Used kafka client in tests:*
org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/2.4.0_1*
*

*Test description:*
The test starts a consumer and a producer in separate threads. The producer produces a message every SLEEP_INTERVAL_MS. If kafka is available, it receives the message and the consumer is able to consume it. When kafka is not available, and you receive TimeoutException, you persist the message to an in-memory backup message store. The main thread tries to restore the messages every PERSIST_BACKUP_CHECK_INTERVAL_MS. When kafka is available again, you successfully restore messages but kafka doesn’t contain some of them (the consumer is not able to consume them).*
*

*Issue reproduction:*
1. All the kafka nodes work.

2. Start the/shouldConsumeAllProducedMessages() /test.
3. After consuming some messages (before consuming all) stop all the kafka nodes.
4. Wait until org.apache.kafka.common.errors.TimeoutException appears.
5. Start kafka nodes.*
*

*Result:*
Many messages couldn’t be consumed because kafka doesn’t contain them, though the producer didn’t have any exception for them in the callback.

*Question:
*In my opinion either kafka should contain the message and the consumer should be able to take it or the callback should contain an exception. Could you confirm? Could you explain the situation I have?
**

Best regards
Konrad Bączyński

--
Konrad Bączyński
Software developer
konrad.baczyn...@empirica.io

Empirica Sp. z o.o.
ul. Curie-Skłodowskiej 12
50-381 Wrocław

www.empirica.pl
NIP 8982201370, REGON 021955151

Zarejestrowana w Sądzie Rejonowym dla Wrocławia-Fabrycznej, VI Wydział 
Gospodarczy Krajowego Rejestru Sądowego pod numerem KRS 0000914199. Kapitał 
zakładowy: 650.000,00 zł opłacony w całości.
package com.empirica.algo.kafka.sender;

import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.reducing;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

@Slf4j
public class KafkaITest {

	private static final String BOOTSTRAP_SERVERS = "kafka1.platform:9092";

	private static final String TOPIC = "test_topic";
	private static final int NUMBER_OF_PRODUCED_MESSAGES = 2_000;
	private static final int SLEEP_INTERVAL_MS = 100;
	private static final long PERSIST_BACKUP_CHECK_INTERVAL_MS = 100L;

	private final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProducerProperties(), new StringSerializer(), new StringSerializer());

	private final Queue<String> backupCollection = new ConcurrentLinkedQueue<>(new ArrayList<>());
	private final ProbabilisticLogger consumerLog = new ProbabilisticLogger(0.01);
	private final ProbabilisticLogger producerLog = new ProbabilisticLogger(0.01);

	@Test
	public void shouldConsumeAllProducedMessages() throws InterruptedException {
		CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_PRODUCED_MESSAGES);

		new Thread(() -> consume(countDownLatch)).start();
		new Thread(this::produce).start();
		persistBackup(countDownLatch);

		countDownLatch.await();
	}

	private void persistBackup(CountDownLatch countDownLatch) throws InterruptedException {
		log.info("Starting to persist from backup");
		while (countDownLatch.getCount() > 0) {
			producerLog.info("Number of messages waiting to being persisted {}", backupCollection.size());
			ofNullable(backupCollection.poll())
					.ifPresent(m -> {
						log.info("Restoring from backup {}", m);
						kafkaProducer.send(
								new ProducerRecord<>(TOPIC, m),
								createCompletionCallback(m));
					});
			Thread.sleep(PERSIST_BACKUP_CHECK_INTERVAL_MS);
		}
		log.info("Finished to persist from backup");
	}

	private void consume(CountDownLatch countDownLatch) {
		log.info("Starting consuming messages");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerProperties(), new StringDeserializer(), new StringDeserializer());

		consumer.subscribe(Collections.singletonList(TOPIC));

		while (countDownLatch.getCount() > 0) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
			Map<TopicPartition, OffsetAndMetadata> offsets = selectNextOffsets(
					StreamSupport
							.stream(records.spliterator(), false)
							.collect(Collectors.toList()));

			records.forEach(r -> countDownLatch.countDown());

			consumer.commitAsync(
					offsets,
					(metadata, exception) -> {
						if (exception != null) {
							consumerLog.error("Error committing offset", exception);
						}
					});

			consumerLog.info("Currently consumed messages {}", NUMBER_OF_PRODUCED_MESSAGES - countDownLatch.getCount());
			try {
				Thread.sleep(SLEEP_INTERVAL_MS);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		log.info("Consumed all the messages");
	}

	private void produce() {
		log.info("Starting producing messages");

		IntStream.range(0, NUMBER_OF_PRODUCED_MESSAGES)
				.forEach(i -> {
							String message = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).map(Object::toString).collect(Collectors.joining()) + "  " + i;
							kafkaProducer.send(
									new ProducerRecord<>(TOPIC, message),
									createCompletionCallback(message));
							try {
								Thread.sleep(SLEEP_INTERVAL_MS);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
				);

		log.info("Produced all the messages");
	}

	private Callback createCompletionCallback(String message) {
		return (metadata, exception) -> {
			if (exception != null) {
				backupCollection.add(message);
				producerLog.error("Failed to send message to topic {} Message {}", TOPIC, message, exception);
			} else {
				producerLog.info(
						"Successfully sent message to topic {} at partition/offset/message {}/{}/{}",
						TOPIC,
						metadata.partition(),
						metadata.offset(),
						message);
			}
		};
	}

	private Map<String, Object> kafkaConsumerProperties() {
		Map<String, Object> props = new HashMap<>();
		props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
		props.put("group.id", "test_group_id");
		props.put("max.poll.records", "250");
		props.put("enable.auto.commit", "false");
		return props;
	}

	private Map<String, Object> kafkaProducerProperties() {
		Map<String, Object> props = new HashMap<>();
		props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
		props.put("acks", "0");
		props.put("compression.type", "gzip");
		return props;
	}

	public static <A, B> Map<TopicPartition, OffsetAndMetadata> selectNextOffsets(List<ConsumerRecord<A, B>> records) {
		return records.stream()
				.collect(
						groupingBy(r -> new TopicPartition(r.topic(), r.partition()),
								mapping(r -> new OffsetAndMetadata(r.offset() + 1),
										reducing(new OffsetAndMetadata(0), (l, r) -> r))));
	}

	@Slf4j
	@RequiredArgsConstructor
	private static class ProbabilisticLogger {

		private final Random random = new Random();
		private final double probability;

		public void info(String s, Object... objects) {
			if (random.nextDouble() < probability) {
				log.info(s, objects);
			}
		}
		public void error(String s, Object... objects) {
			if (random.nextDouble() < probability) {
				log.error(s, objects);
			}
		}
	}
}

Reply via email to