This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d8cf47b KAFKA-13676: Commit successfully processed tasks on error
(#11791)
d8cf47b is described below
commit d8cf47bf28af0b5eb5be8382883e6a331b788e01
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Feb 22 23:10:05 2022 -0800
KAFKA-13676: Commit successfully processed tasks on error (#11791)
When we hit an exception when processing tasks we should save the work we
have done so far.
This will only be relevant with ALOS and EOS-v1, not EOS-v2. It will
actually reduce the number of duplicated record in ALOS because we will not be
successfully processing tasks successfully more than once in many cases.
This is currently enabled only for named topologies.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Guozhang
Wang <[email protected]>
---
.../streams/processor/internals/TaskExecutor.java | 37 +++++--
.../streams/processor/internals/TaskManager.java | 2 +-
.../kafka/streams/processor/internals/Tasks.java | 17 ++++
.../integration/EmitOnChangeIntegrationTest.java | 110 ++++++++++++++++++++-
4 files changed, 158 insertions(+), 8 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index efe04ef..4edc35b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@@ -47,12 +48,17 @@ public class TaskExecutor {
private final Logger log;
+ private final boolean hasNamedTopologies;
private final ProcessingMode processingMode;
private final Tasks tasks;
- public TaskExecutor(final Tasks tasks, final ProcessingMode
processingMode, final LogContext logContext) {
+ public TaskExecutor(final Tasks tasks,
+ final ProcessingMode processingMode,
+ final boolean hasNamedTopologies,
+ final LogContext logContext) {
this.tasks = tasks;
this.processingMode = processingMode;
+ this.hasNamedTopologies = hasNamedTopologies;
this.log = logContext.logger(getClass());
}
@@ -62,9 +68,16 @@ public class TaskExecutor {
*/
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;
-
- for (final Task task : tasks.activeTasks()) {
- totalProcessed += processTask(task, maxNumRecords, time);
+ Task lastProcessed = null;
+ try {
+ for (final Task task : tasks.activeTasks()) {
+ lastProcessed = task;
+ totalProcessed += processTask(task, maxNumRecords, time);
+ }
+ } catch (final Exception e) {
+
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+ commitSuccessfullyProcessedTasks();
+ throw e;
}
return totalProcessed;
@@ -80,6 +93,10 @@ public class TaskExecutor {
task.clearTaskTimeout();
processed++;
}
+ // TODO: enable regardless of whether using named topologies
+ if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
+ tasks.addToSuccessfullyProcessed(task);
+ }
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
@@ -139,8 +156,6 @@ public class TaskExecutor {
return committed;
}
-
-
/**
* Caution: do not invoke this directly if it's possible a rebalance is
occurring, as the commit will fail. If
* this is a possibility, prefer the {@link
#commitTasksAndMaybeUpdateCommittableOffsets} instead.
@@ -234,6 +249,16 @@ public class TaskExecutor {
}
}
+ private void commitSuccessfullyProcessedTasks() {
+ if (!tasks.successfullyProcessed().isEmpty()) {
+ log.info("Streams encountered an error when processing tasks." +
+ " Will commit all previously successfully processed tasks {}",
+ tasks.successfullyProcessed().toString());
+
commitTasksAndMaybeUpdateCommittableOffsets(tasks.successfullyProcessed(), new
HashMap<>());
+ }
+ tasks.clearSuccessfullyProcessed();
+ }
+
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index fb1df22..0ff6dc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -116,7 +116,7 @@ public class TaskManager {
this.log = logContext.logger(getClass());
this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics,
activeTaskCreator, standbyTaskCreator);
- this.taskExecutor = new TaskExecutor(tasks, processingMode,
logContext);
+ this.taskExecutor = new TaskExecutor(tasks, processingMode,
topologyMetadata.hasNamedTopologies(), logContext);
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index f2839456..2740791 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -59,6 +59,7 @@ class Tasks {
// TODO: change type to `StandbyTask`
private final Map<TaskId, Task> readOnlyStandbyTasksPerId =
Collections.unmodifiableMap(standbyTasksPerId);
private final Set<TaskId> readOnlyStandbyTaskIds =
Collections.unmodifiableSet(standbyTasksPerId.keySet());
+ private final Collection<Task> successfullyProcessed = new HashSet<>();
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
@@ -319,6 +320,22 @@ class Tasks {
return mainConsumer;
}
+ Collection<Task> successfullyProcessed() {
+ return successfullyProcessed;
+ }
+
+ void addToSuccessfullyProcessed(final Task task) {
+ successfullyProcessed.add(task);
+ }
+
+ void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
+ successfullyProcessed.remove(task);
+ }
+
+ void clearSuccessfullyProcessed() {
+ successfullyProcessed.clear();
+ }
+
// for testing only
void addTask(final Task task) {
if (task.isActive()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 63e0f27..2d04070 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -30,6 +30,8 @@ import
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
+import
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import
org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
@@ -45,11 +47,14 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
@@ -70,7 +75,9 @@ public class EmitOnChangeIntegrationTest {
public TestName testName = new TestName();
private static String inputTopic;
+ private static String inputTopic2;
private static String outputTopic;
+ private static String outputTopic2;
private static String appId = "";
@Before
@@ -78,8 +85,10 @@ public class EmitOnChangeIntegrationTest {
final String testId = safeUniqueTestName(getClass(), testName);
appId = "appId_" + testId;
inputTopic = "input" + testId;
+ inputTopic2 = "input2" + testId;
outputTopic = "output" + testId;
- IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic,
outputTopic);
+ outputTopic2 = "output2" + testId;
+ IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic,
outputTopic, inputTopic2, outputTopic2);
}
@Test
@@ -110,6 +119,7 @@ public class EmitOnChangeIntegrationTest {
}
})
.to(outputTopic);
+ builder.stream(inputTopic2).to(outputTopic2);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception ->
StreamThreadExceptionResponse.REPLACE_THREAD);
@@ -128,6 +138,19 @@ public class EmitOnChangeIntegrationTest {
new Properties()),
0L);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
@@ -140,6 +163,91 @@ public class EmitOnChangeIntegrationTest {
new KeyValue<>(1, "B")
)
);
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ ),
+ outputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ )
+ );
+ }
+ }
+
+ @Test
+ public void shouldEmitRecordsAfterFailures() throws Exception {
+ final Properties properties = mkObjectProperties(
+ mkMap(
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath()),
+ mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+ mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+ mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class),
+ mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class),
+ mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
+ )
+ );
+
+ try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new
KafkaStreamsNamedTopologyWrapper(properties)) {
+ kafkaStreams.setUncaughtExceptionHandler(exception ->
StreamThreadExceptionResponse.REPLACE_THREAD);
+
+ final NamedTopologyBuilder builder =
kafkaStreams.newNamedTopologyBuilder("topology_A");
+ final AtomicInteger noOutputExpected = new AtomicInteger(0);
+ final AtomicInteger twoOutputExpected = new AtomicInteger(0);
+ builder.stream(inputTopic2).peek((k, v) ->
twoOutputExpected.incrementAndGet()).to(outputTopic2);
+ builder.stream(inputTopic)
+ .peek((k, v) -> {
+ throw new RuntimeException("Kaboom");
+ })
+ .peek((k, v) -> noOutputExpected.incrementAndGet())
+ .to(outputTopic);
+
+ kafkaStreams.addNamedTopology(builder.build());
+
+
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ Arrays.asList(
+ new KeyValue<>(1, "A")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ 0L);
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ ),
+ outputTopic2,
+ Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(1, "B")
+ )
+ );
+ assertThat(noOutputExpected.get(), equalTo(0));
+ assertThat(twoOutputExpected.get(), equalTo(2));
}
}
}