This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new af7fabc KAFKA-8040: Streams handle initTransactions timeout (#6416) af7fabc is described below commit af7fabc5a9302708ea277e4252a7382f19ac9a11 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Mon Mar 11 17:18:11 2019 -0500 KAFKA-8040: Streams handle initTransactions timeout (#6416) https://issues.apache.org/jira/browse/KAFKA-7934 Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../processor/internals/RecordCollectorImpl.java | 19 ++- .../streams/processor/internals/StreamTask.java | 24 +++- .../processor/internals/StreamTaskTest.java | 157 +++++++++++++++++++-- .../internals/testutil/LogCaptureAppender.java | 52 ++++++- 4 files changed, 230 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 554cc85..e483f58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -204,11 +204,20 @@ public class RecordCollectorImpl implements RecordCollector { } }); } catch (final TimeoutException e) { - log.error("Timeout exception caught when sending record to topic {}. " + - "This might happen if the producer cannot send data to the Kafka cluster and thus, " + - "its internal buffer fills up. " + - "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); - throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); + log.error( + "Timeout exception caught when sending record to topic {}. " + + "This might happen if the producer cannot send data to the Kafka cluster and thus, " + + "its internal buffer fills up. " + + "This can also happen if the broker is slow to respond, if the network connection to " + + "the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout.", + topic, + e + ); + throw new StreamsException( + String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic), + e + ); } catch (final Exception uncaughtException) { if (uncaughtException instanceof KafkaException && uncaughtException.getCause() instanceof ProducerFencedException) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ce8e3c1..a325b24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; @@ -222,7 +223,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // initialize transactions if eos is turned on, which will block if the previous transaction has not // completed yet; do not start the first transaction until the topology has been initialized later if (eosEnabled) { - this.producer.initTransactions(); + initializeTransactions(); } } @@ -270,7 +271,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator throw new IllegalStateException("Task producer should be null."); } producer = producerSupplier.get(); - producer.initTransactions(); + initializeTransactions(); recordCollector.init(producer); } } @@ -796,4 +797,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator Producer<byte[], byte[]> getProducer() { return producer; } + + private void initializeTransactions() { + try { + producer.initTransactions(); + } catch (final TimeoutException retriable) { + log.error( + "Timeout exception caught when initializing transactions for task {}. " + + "This might happen if the broker is slow to respond, if the network connection to " + + "the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout.", + id, + retriable + ); + throw new StreamsException( + format("%sFailed to initialize task %s due to timeout.", logPrefix, id), + retriable + ); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index bacfcb7..2e61d7f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -45,6 +46,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; @@ -60,17 +62,20 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.time.Duration; -import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -164,7 +169,7 @@ public class StreamTaskTest { @Before public void setup() { - consumer.assign(Arrays.asList(partition1, partition2)); + consumer.assign(asList(partition1, partition2)); stateDirectory = new StateDirectory(createConfig(false), new MockTime()); } @@ -183,18 +188,142 @@ public class StreamTaskTest { } } + @Test + public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final ProcessorTopology topology = ProcessorTopology.withSources( + asList(source1, source2, processorStreamTime, processorSystemTime), + mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + ); + + source1.addChild(processorStreamTime); + source2.addChild(processorStreamTime); + source1.addChild(processorSystemTime); + source2.addChild(processorSystemTime); + + try { + new StreamTask( + taskId00, + partitions, + topology, + consumer, + changelogReader, + createConfig(true), + streamsMetrics, + stateDirectory, + null, + time, + () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) { + @Override + public void initTransactions() { + throw new TimeoutException("test"); + } + } + ); + fail("Expected an exception"); + } catch (final StreamsException expected) { + // make sure we log the explanation as an ERROR + assertTimeoutErrorLog(appender); + + // make sure we report the correct message + assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + + // make sure we preserve the cause + assertEquals(expected.getCause().getClass(), TimeoutException.class); + assertThat(expected.getCause().getMessage(), is("test")); + } + LogCaptureAppender.unregister(appender); + } + + @Test + public void shouldHandleInitTransactionsTimeoutExceptionOnResume() { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final ProcessorTopology topology = ProcessorTopology.withSources( + asList(source1, source2, processorStreamTime, processorSystemTime), + mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + ); + + source1.addChild(processorStreamTime); + source2.addChild(processorStreamTime); + source1.addChild(processorSystemTime); + source2.addChild(processorSystemTime); + + final AtomicBoolean timeOut = new AtomicBoolean(false); + + final StreamTask testTask = new StreamTask( + taskId00, + partitions, + topology, + consumer, + changelogReader, + createConfig(true), + streamsMetrics, + stateDirectory, + null, + time, + () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) { + @Override + public void initTransactions() { + if (timeOut.get()) { + throw new TimeoutException("test"); + } else { + super.initTransactions(); + } + } + } + ); + testTask.initializeTopology(); + testTask.suspend(); + timeOut.set(true); + try { + testTask.resume(); + fail("Expected an exception"); + } catch (final StreamsException expected) { + // make sure we log the explanation as an ERROR + assertTimeoutErrorLog(appender); + + // make sure we report the correct message + assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + + // make sure we preserve the cause + assertEquals(expected.getCause().getClass(), TimeoutException.class); + assertThat(expected.getCause().getMessage(), is("test")); + } + LogCaptureAppender.unregister(appender); + } + + private void assertTimeoutErrorLog(final LogCaptureAppender appender) { + + final String expectedErrorLogMessage = + "task [0_0] Timeout exception caught when initializing transactions for task 0_0. " + + "This might happen if the broker is slow to respond, if the network " + + "connection to the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout."; + + final List<String> expectedError = + appender + .getEvents() + .stream() + .filter(event -> event.getMessage().equals(expectedErrorLogMessage)) + .map(LogCaptureAppender.Event::getLevel) + .collect(Collectors.toList()); + assertThat(expectedError, is(singletonList("ERROR"))); + } + @SuppressWarnings("unchecked") @Test public void testProcessOrder() { task = createStatelessTask(createConfig(false)); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 10), getConsumerRecord(partition1, 20), getConsumerRecord(partition1, 30) )); - task.addRecords(partition2, Arrays.asList( + task.addRecords(partition2, asList( getConsumerRecord(partition2, 25), getConsumerRecord(partition2, 35), getConsumerRecord(partition2, 45) @@ -259,12 +388,12 @@ public class StreamTaskTest { public void testPauseResume() { task = createStatelessTask(createConfig(false)); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 10), getConsumerRecord(partition1, 20) )); - task.addRecords(partition2, Arrays.asList( + task.addRecords(partition2, asList( getConsumerRecord(partition2, 35), getConsumerRecord(partition2, 45), getConsumerRecord(partition2, 55), @@ -278,7 +407,7 @@ public class StreamTaskTest { assertEquals(1, consumer.paused().size()); assertTrue(consumer.paused().contains(partition2)); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 30), getConsumerRecord(partition1, 40), getConsumerRecord(partition1, 50) @@ -316,7 +445,7 @@ public class StreamTaskTest { task.initializeStateStores(); task.initializeTopology(); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 0), getConsumerRecord(partition1, 20), getConsumerRecord(partition1, 32), @@ -324,7 +453,7 @@ public class StreamTaskTest { getConsumerRecord(partition1, 60) )); - task.addRecords(partition2, Arrays.asList( + task.addRecords(partition2, asList( getConsumerRecord(partition2, 25), getConsumerRecord(partition2, 35), getConsumerRecord(partition2, 45), @@ -407,14 +536,14 @@ public class StreamTaskTest { task.initializeStateStores(); task.initializeTopology(); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 20), getConsumerRecord(partition1, 142), getConsumerRecord(partition1, 155), getConsumerRecord(partition1, 160) )); - task.addRecords(partition2, Arrays.asList( + task.addRecords(partition2, asList( getConsumerRecord(partition2, 25), getConsumerRecord(partition2, 145), getConsumerRecord(partition2, 159), @@ -493,13 +622,13 @@ public class StreamTaskTest { task.initializeStateStores(); task.initializeTopology(); - task.addRecords(partition1, Arrays.asList( + task.addRecords(partition1, asList( getConsumerRecord(partition1, 20), getConsumerRecord(partition1, 30), getConsumerRecord(partition1, 40) )); - task.addRecords(partition2, Arrays.asList( + task.addRecords(partition2, asList( getConsumerRecord(partition2, 25), getConsumerRecord(partition2, 35), getConsumerRecord(partition2, 45) @@ -1198,7 +1327,7 @@ public class StreamTaskTest { mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(repartition.topic(), (SourceNode) source2)), Collections.singleton(repartition.topic()) ); - consumer.assign(Arrays.asList(partition1, repartition)); + consumer.assign(asList(partition1, repartition)); task = new StreamTask( taskId00, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java index b6f5769..462159f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java @@ -21,11 +21,37 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Optional; public class LogCaptureAppender extends AppenderSkeleton { - private final LinkedList<LoggingEvent> events = new LinkedList<>(); + private final Deque<LoggingEvent> events = new LinkedList<>(); + + public static class Event { + private final String level; + private final String message; + private final Optional<String> throwableInfo; + + Event(final String level, final String message, final Optional<String> throwableInfo) { + this.level = level; + this.message = message; + this.throwableInfo = throwableInfo; + } + + public String getLevel() { + return level; + } + + public String getMessage() { + return message; + } + + public Optional<String> getThrowableInfo() { + return throwableInfo; + } + } public static LogCaptureAppender createAndRegister() { final LogCaptureAppender logCaptureAppender = new LogCaptureAppender(); @@ -54,6 +80,30 @@ public class LogCaptureAppender extends AppenderSkeleton { return result; } + public List<Event> getEvents() { + final LinkedList<Event> result = new LinkedList<>(); + synchronized (events) { + for (final LoggingEvent event : events) { + final String[] throwableStrRep = event.getThrowableStrRep(); + final Optional<String> throwableString; + if (throwableStrRep == null) { + throwableString = Optional.empty(); + } else { + final StringBuilder throwableStringBuilder = new StringBuilder(); + + for (final String s : throwableStrRep) { + throwableStringBuilder.append(s); + } + + throwableString = Optional.of(throwableStringBuilder.toString()); + } + + result.add(new Event(event.getLevel().toString(), event.getRenderedMessage(), throwableString)); + } + } + return result; + } + @Override public void close() {