This is an automated email from the ASF dual-hosted git repository.
jlprat pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 41037bf78da KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test
due to admin timeouts (#13575)
41037bf78da is described below
commit 41037bf78da3113eaf7f67b33ad7b00f4def930d
Author: Greg Harris <[email protected]>
AuthorDate: Fri Apr 21 12:55:41 2023 -0700
KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin
timeouts (#13575)
Reduce flakiness of
`MirrorConnectorsWithCustomForwardingAdminIntegrationTest`
Reviewers: Josep Prat <[email protected]>
---
.../FakeForwardingAdminWithLocalMetadata.java | 62 +++++++++-------------
1 file changed, 24 insertions(+), 38 deletions(-)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
index 535dcaca9ee..3ac8a8b17f0 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
@@ -37,9 +37,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/** Customised ForwardingAdmin for testing only.
* The class create/alter topics, partitions and ACLs in Kafka then store
metadata in {@link FakeLocalMetadataStore}.
@@ -47,7 +44,6 @@ import java.util.concurrent.TimeoutException;
public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
private static final Logger log =
LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
- private final long timeout = 1000L;
public FakeForwardingAdminWithLocalMetadata(Map<String, Object> configs) {
super(configs);
@@ -56,35 +52,29 @@ public class FakeForwardingAdminWithLocalMetadata extends
ForwardingAdmin {
@Override
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
CreateTopicsOptions options) {
CreateTopicsResult createTopicsResult = super.createTopics(newTopics,
options);
- newTopics.forEach(newTopic -> {
- try {
- log.info("Add topic '{}' to cluster and metadata store",
newTopic);
- // Wait for topic to be created before edit the fake local
store
- createTopicsResult.values().get(newTopic.name()).get(timeout,
TimeUnit.MILLISECONDS);
+ newTopics.forEach(newTopic ->
createTopicsResult.values().get(newTopic.name()).whenComplete((ignored, error)
-> {
+ if (error == null) {
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
- } catch (InterruptedException | ExecutionException |
TimeoutException e) {
- if (e.getCause() instanceof TopicExistsException) {
- log.warn("Topic '{}' already exists. Update the local
metadata store if absent", newTopic.name());
-
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
- } else
- log.error(e.getMessage());
+ } else if (error.getCause() instanceof TopicExistsException) {
+ log.warn("Topic '{}' already exists. Update the local metadata
store if absent", newTopic.name());
+ FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
+ } else {
+ log.error("Unable to intercept admin client operation", error);
}
- });
+ }));
return createTopicsResult;
}
@Override
public CreatePartitionsResult createPartitions(Map<String, NewPartitions>
newPartitions, CreatePartitionsOptions options) {
CreatePartitionsResult createPartitionsResult =
super.createPartitions(newPartitions, options);
- newPartitions.forEach((topic, newPartition) -> {
- try {
- // Wait for topic partition to be created before edit the fake
local store
- createPartitionsResult.values().get(topic).get(timeout,
TimeUnit.MILLISECONDS);
+ newPartitions.forEach((topic, newPartition) ->
createPartitionsResult.values().get(topic).whenComplete((ignored, error) -> {
+ if (error == null) {
FakeLocalMetadataStore.updatePartitionCount(topic,
newPartition.totalCount());
- } catch (InterruptedException | ExecutionException |
TimeoutException e) {
- log.error(e.getMessage());
+ } else {
+ log.error("Unable to intercept admin client operation", error);
}
- });
+ }));
return createPartitionsResult;
}
@@ -92,17 +82,15 @@ public class FakeForwardingAdminWithLocalMetadata extends
ForwardingAdmin {
@Override
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config>
configs, AlterConfigsOptions options) {
AlterConfigsResult alterConfigsResult = super.alterConfigs(configs,
options);
- configs.forEach((configResource, newConfigs) -> {
- try {
+ configs.forEach((configResource, newConfigs) ->
alterConfigsResult.values().get(configResource).whenComplete((ignored, error)
-> {
+ if (error == null) {
if (configResource.type() == ConfigResource.Type.TOPIC) {
- // Wait for config to be altered before edit the fake
local store
-
alterConfigsResult.values().get(configResource).get(timeout,
TimeUnit.MILLISECONDS);
FakeLocalMetadataStore.updateTopicConfig(configResource.name(), newConfigs);
}
- } catch (InterruptedException | ExecutionException |
TimeoutException e) {
- log.error(e.getMessage());
+ } else {
+ log.error("Unable to intercept admin client operation", error);
}
- });
+ }));
return alterConfigsResult;
}
@@ -110,15 +98,13 @@ public class FakeForwardingAdminWithLocalMetadata extends
ForwardingAdmin {
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls,
CreateAclsOptions options) {
CreateAclsResult aclsResult = super.createAcls(acls, options);
- try {
- // Wait for acls to be created before edit the fake local store
- aclsResult.all().get(timeout, TimeUnit.MILLISECONDS);
- acls.forEach(aclBinding -> {
+ aclsResult.values().forEach((aclBinding, future) ->
future.whenComplete((ignored, error) -> {
+ if (error == null) {
FakeLocalMetadataStore.addACLs(aclBinding.entry().principal(),
aclBinding);
- });
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- log.error(e.getMessage());
- }
+ } else {
+ log.error("Unable to intercept admin client operation", error);
+ }
+ }));
return aclsResult;
}
}