This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 1c1f3ea KAFKA-10559: Not letting TimeoutException shutdown the app
during internal topic validation (#9432)
1c1f3ea is described below
commit 1c1f3ea0b032850b1291390b6fc8928ef7195359
Author: vamossagar12 <[email protected]>
AuthorDate: Fri Oct 16 03:40:21 2020 +0530
KAFKA-10559: Not letting TimeoutException shutdown the app during internal
topic validation (#9432)
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../internals/StreamsPartitionAssignor.java | 4 +-
.../internals/StreamsPartitionAssignorTest.java | 86 ----------------------
2 files changed, 2 insertions(+), 88 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d7df48f..d353e04 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -345,7 +345,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
try {
allRepartitionTopicPartitions =
prepareRepartitionTopics(topicGroups, metadata);
- } catch (final TaskAssignmentException | TimeoutException e) {
+ } catch (final TaskAssignmentException e) {
return new GroupAssignment(
errorAssignment(clientMetadataMap,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
@@ -376,7 +376,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final boolean probingRebalanceNeeded;
try {
probingRebalanceNeeded = assignTasksToClients(fullMetadata,
allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask,
statefulTasks);
- } catch (final TaskAssignmentException | TimeoutException e) {
+ } catch (final TaskAssignmentException e) {
return new GroupAssignment(
errorAssignment(clientMetadataMap,
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 43b7f3e..c1533f1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
@@ -41,7 +40,6 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TopologyWrapper;
-import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -51,7 +49,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
@@ -1184,89 +1181,6 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void
shouldReturnShutdownErrorCodeWhenCreatingRepartitionTopicsTimesOut() {
- final StreamsBuilder streamsBuilder = new StreamsBuilder();
- streamsBuilder.stream("topic1").repartition();
-
- final String client = "client1";
- builder =
TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
-
- createDefaultMockTaskManager();
- EasyMock.replay(taskManager);
- partitionAssignor.configure(configProps());
- final MockInternalTopicManager mockInternalTopicManager = new
MockInternalTopicManager(
- time,
- new StreamsConfig(configProps()),
- mockClientSupplier.restoreConsumer,
- false
- ) {
- @Override
- public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) throws TaskAssignmentException {
- throw new TimeoutException("KABOOM!");
- }
- };
- partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
-
- subscriptions.put(client,
- new Subscription(
- singletonList("topic1"),
- defaultSubscriptionInfo.encode()
- )
- );
- final Map<String, Assignment> assignment =
- partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
-
- // check if we created a task for all expected topicPartitions.
- assertThat(
- AssignmentInfo.decode(assignment.get(client).userData()).errCode(),
- equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
- );
- }
-
- @Test
- public void
shouldReturnShutdownErrorCodeWhenCreatingChangelogTopicsTimesOut() {
- final StreamsBuilder streamsBuilder = new StreamsBuilder();
- streamsBuilder.table("topic1", Materialized.as("store"));
-
- final String client = "client1";
- builder =
TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
-
- createDefaultMockTaskManager();
- EasyMock.replay(taskManager);
- partitionAssignor.configure(configProps());
- final MockInternalTopicManager mockInternalTopicManager = new
MockInternalTopicManager(
- time,
- new StreamsConfig(configProps()),
- mockClientSupplier.restoreConsumer,
- false
- ) {
- @Override
- public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) throws TaskAssignmentException {
- if (topics.isEmpty()) {
- return emptySet();
- }
- throw new TimeoutException("KABOOM!");
- }
- };
- partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
-
- subscriptions.put(client,
- new Subscription(
- singletonList("topic1"),
- defaultSubscriptionInfo.encode()
- )
- );
- final Map<String, Assignment> assignment =
- partitionAssignor.assign(metadata, new
GroupSubscription(subscriptions)).groupAssignment();
-
- // check if we created a task for all expected topicPartitions.
- assertThat(
- AssignmentInfo.decode(assignment.get(client).userData()).errCode(),
- equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
- );
- }
-
- @Test
public void shouldAddUserDefinedEndPointToSubscription() {
builder.addSource(null, "source", null, null, null, "input");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source");