This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new b4c675b KAFKA-8040: Streams handle initTransactions timeout (#6372) b4c675b is described below commit b4c675b8e03fec28f1de2ac0ae270fc4388fdf57 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Fri Mar 8 09:29:22 2019 -0600 KAFKA-8040: Streams handle initTransactions timeout (#6372) As of 2.0, Producer.initTransactions may throw a TimeoutException, which is retriable. Streams should retry instead of crashing when we encounter this exception Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../processor/internals/RecordCollectorImpl.java | 19 ++- .../streams/processor/internals/StreamTask.java | 24 +++- .../processor/internals/StreamTaskTest.java | 135 ++++++++++++++++++++- 3 files changed, 170 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index d3a0030..9f9e100 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -204,11 +204,20 @@ public class RecordCollectorImpl implements RecordCollector { } }); } catch (final TimeoutException e) { - log.error("Timeout exception caught when sending record to topic {}. " + - "This might happen if the producer cannot send data to the Kafka cluster and thus, " + - "its internal buffer fills up. " + - "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); - throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); + log.error( + "Timeout exception caught when sending record to topic {}. " + + "This might happen if the producer cannot send data to the Kafka cluster and thus, " + + "its internal buffer fills up. " + + "This can also happen if the broker is slow to respond, if the network connection to " + + "the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout.", + topic, + e + ); + throw new StreamsException( + String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic), + e + ); } catch (final Exception uncaughtException) { if (uncaughtException instanceof KafkaException && uncaughtException.getCause() instanceof ProducerFencedException) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c4fecef..53abf82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; @@ -242,7 +243,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // initialize transactions if eos is turned on, which will block if the previous transaction has not // completed yet; do not start the first transaction until the topology has been initialized later if (eosEnabled) { - this.producer.initTransactions(); + initializeTransactions(); } } @@ -294,7 +295,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator throw new IllegalStateException("Task producer should be null."); } producer = producerSupplier.get(); - producer.initTransactions(); + initializeTransactions(); recordCollector.init(producer); } } @@ -841,4 +842,23 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator Producer<byte[], byte[]> getProducer() { return producer; } + + private void initializeTransactions() { + try { + producer.initTransactions(); + } catch (final TimeoutException retriable) { + log.error( + "Timeout exception caught when initializing transactions for task {}. " + + "This might happen if the broker is slow to respond, if the network connection to " + + "the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout.", + id, + retriable + ); + throw new StreamsException( + format("%sFailed to initialize task %s due to timeout.", logPrefix, id), + retriable + ); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 74addbd..af64099 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; @@ -47,11 +48,12 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockStateRestoreListener; -import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; @@ -65,15 +67,18 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -186,6 +191,134 @@ public class StreamTaskTest { } } + @Test + public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final ProcessorTopology topology = ProcessorTopology.withSources( + asList(source1, source2, processorStreamTime, processorSystemTime), + mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + ); + + source1.addChild(processorStreamTime); + source2.addChild(processorStreamTime); + source1.addChild(processorSystemTime); + source2.addChild(processorSystemTime); + + try { + new StreamTask( + taskId00, + partitions, + topology, + consumer, + changelogReader, + createConfig(true), + streamsMetrics, + stateDirectory, + null, + time, + () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) { + @Override + public void initTransactions() { + throw new TimeoutException("test"); + } + }, + null, + null + ); + fail("Expected an exception"); + } catch (final StreamsException expected) { + // make sure we log the explanation as an ERROR + assertTimeoutErrorLog(appender); + + // make sure we report the correct message + assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + + // make sure we preserve the cause + assertEquals(expected.getCause().getClass(), TimeoutException.class); + assertThat(expected.getCause().getMessage(), is("test")); + } + LogCaptureAppender.unregister(appender); + } + + @Test + public void shouldHandleInitTransactionsTimeoutExceptionOnResume() { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final ProcessorTopology topology = ProcessorTopology.withSources( + asList(source1, source2, processorStreamTime, processorSystemTime), + mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) + ); + + source1.addChild(processorStreamTime); + source2.addChild(processorStreamTime); + source1.addChild(processorSystemTime); + source2.addChild(processorSystemTime); + + final AtomicBoolean timeOut = new AtomicBoolean(false); + + final StreamTask testTask = new StreamTask( + taskId00, + partitions, + topology, + consumer, + changelogReader, + createConfig(true), + streamsMetrics, + stateDirectory, + null, + time, + () -> producer = new MockProducer<byte[], byte[]>(false, bytesSerializer, bytesSerializer) { + @Override + public void initTransactions() { + if (timeOut.get()) { + throw new TimeoutException("test"); + } else { + super.initTransactions(); + } + } + }, + null, + null + ); + testTask.initializeTopology(); + testTask.suspend(); + timeOut.set(true); + try { + testTask.resume(); + fail("Expected an exception"); + } catch (final StreamsException expected) { + // make sure we log the explanation as an ERROR + assertTimeoutErrorLog(appender); + + // make sure we report the correct message + assertThat(expected.getMessage(), is("task [0_0] Failed to initialize task 0_0 due to timeout.")); + + // make sure we preserve the cause + assertEquals(expected.getCause().getClass(), TimeoutException.class); + assertThat(expected.getCause().getMessage(), is("test")); + } + LogCaptureAppender.unregister(appender); + } + + private void assertTimeoutErrorLog(final LogCaptureAppender appender) { + + final String expectedErrorLogMessage = + "task [0_0] Timeout exception caught when initializing transactions for task 0_0. " + + "This might happen if the broker is slow to respond, if the network " + + "connection to the broker was interrupted, or if similar circumstances arise. " + + "You can increase producer parameter `max.block.ms` to increase this timeout."; + + final List<String> expectedError = + appender + .getEvents() + .stream() + .filter(event -> event.getMessage().equals(expectedErrorLogMessage)) + .map(LogCaptureAppender.Event::getLevel) + .collect(Collectors.toList()); + assertThat(expectedError, is(singletonList("ERROR"))); + } + @SuppressWarnings("unchecked") @Test public void testProcessOrder() {