[
https://issues.apache.org/jira/browse/KAFKA-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16038779#comment-16038779
]
Ismael Juma commented on KAFKA-5390:
------------------------------------
Thanks for the report. Pasting the code so that it's easier to reference.
There's an issue in the code below. You are not passing a callback to `send` or
calling `get` on the Future. This means that any errors during `send` are not
captured. Passing the callback seems to be the simplest option in this case and
that will tell us if there are errors during `send`.
{code}
package com.reftel.magnus.kafkasequence;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
public class SequenceTest {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
private ServerCnxnFactory factory;
private KafkaServerStartable broker;
private int kafkaPort;
@Before
public void before() throws Throwable {
factory = NIOServerCnxnFactory.createFactory(null, 10);
factory.startup(new ZooKeeperServer(
temp.newFolder("zk-snapshot"), temp.newFolder("zk-log"),
ZooKeeperServer.DEFAULT_TICK_TIME
));
try (ServerSocket socket = new ServerSocket(0)) {
kafkaPort = socket.getLocalPort();
}
Properties props = new Properties();
props.put("listeners", String.format("PLAINTEXT://%s:%d", "localhost",
kafkaPort));
props.put("log.dir", temp.newFolder("kafka").toString());
props.put("log.segment.bytes", "4000");
props.put("num.partitions", "1");
props.put("zookeeper.connect", String.format("localhost:%d",
factory.getLocalPort()));
broker = new KafkaServerStartable(new KafkaConfig(props));
broker.startup();
}
@After
public void after() {
broker.shutdown();
factory.shutdown();
}
private KafkaProducer<String, String> buildProducer() throws IOException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.format("localhost:%d", kafkaPort));
props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.toString(60000));
return new KafkaProducer<>(props, new StringSerializer(), new
StringSerializer());
}
private KafkaConsumer<String, String> buildConsumer() throws IOException {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.format("localhost:%d", kafkaPort));
return new KafkaConsumer<>(props, new StringDeserializer(), new
StringDeserializer());
}
@Test
public void testSequence() throws IOException {
List<ProducerRecord<String, String>> records = new ArrayList<>(9);
for (int i = 0; i < 9; i++) {
int dataLength = 2000;
StringBuilder sb = new StringBuilder(dataLength);
for (int n = 0; n < dataLength; n++) {
sb.append('x');
}
records.add(new ProducerRecord<>("t", Integer.toString(i),
sb.toString()));
}
final KafkaProducer<String, String> producer = buildProducer();
records.forEach(producer::send);
producer.flush();
producer.close();
final KafkaConsumer<String, String> consumer = buildConsumer();
consumer.assign(Collections.singleton(new TopicPartition("t", 0)));
consumer.seekToBeginning(consumer.assignment());
final ConsumerRecords<String, String> read = consumer.poll(1000);
if (!read.isEmpty()) {
final ConsumerRecord<String, String> first = read.iterator().next();
Assert.assertEquals(first.key(), "0");
}
}
}
{code}
> First records in batch rejected but others accepted when rolling log
> --------------------------------------------------------------------
>
> Key: KAFKA-5390
> URL: https://issues.apache.org/jira/browse/KAFKA-5390
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.2.1
> Reporter: Magnus Reftel
> Attachments: kafka-sequence.tar.gz
>
>
> When sending a sequence of records in a batch right when the broker needs to
> roll a new segment, it's possible for the first few records to fail, while
> other records in the batch are accepted. If records have dependencies on
> earlier records, e.g. in the case of a sequence of events in an event-sourced
> system, then a producer cannot use the batching functionality, since it then
> risks consumers receiving a record without first receiving the records it
> depends on.
> See attached testcase (kafka-sequence.tar.gz).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)