This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 70a506b MINOR: Ignore test_broker_type_bounce_at_start system test (#5055) 70a506b is described below commit 70a506b9839605304a0a67b70e711a1139b47cbe Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Mon May 21 17:28:40 2018 -0700 MINOR: Ignore test_broker_type_bounce_at_start system test (#5055) test_broker_type_bounce_at_start tries to validate that when the controller is down, the streams client will always fail trying to create the topic; with the current behavior of admin client it is actually not always true: the actual behavior depends on the admin client internals as well as when the controller becomes unavailable during the leader assign partitions phase. I'd suggest at least ignore this test for now until the admin client has more stable (personally I'd even suggest [...] Also adding a few more log4j entries as a result of investigating this issue. Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 5 ++++- .../kafka/streams/processor/internals/InternalTopicManager.java | 4 ++++ .../kafka/streams/processor/internals/StreamsPartitionAssignor.java | 5 ++--- tests/kafkatest/tests/streams/streams_broker_bounce_test.py | 1 + 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5f4eefe..00f543c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -577,7 +577,9 @@ public class KafkaAdminClient extends AdminClient { // this RPC. That is why 'tries' is not incremented. if ((throwable instanceof UnsupportedVersionException) && handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { - log.trace("{} attempting protocol downgrade.", this); + if (log.isDebugEnabled()) { + log.debug("{} attempting protocol downgrade and then retry.", this); + } runnable.enqueue(this, now); return; } @@ -813,6 +815,7 @@ public class KafkaAdminClient extends AdminClient { * @param pendingIter An iterator yielding pending calls. */ private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) { + log.trace("Trying to choose nodes for {} at {}", pendingIter, now); while (pendingIter.hasNext()) { Call call = pendingIter.next(); Node node = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 2ac37bd..2c2df04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -108,6 +108,7 @@ public class InternalTopicManager { .configs(topicConfig)); } + // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client int remainingRetries = retries; boolean retry; do { @@ -171,6 +172,9 @@ public class InternalTopicManager { */ // visible for testing protected Map<String, Integer> getNumPartitions(final Set<String> topics) { + log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics); + + // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client int remainingRetries = retries; boolean retry; do { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index d7a9b33..e1464e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -718,9 +718,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable * * @param topicPartitions Map that contains the topic names to be created with the number of partitions */ - @SuppressWarnings("deprecation") private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) { - log.debug("Starting to validate internal topics in partition assignor."); + log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions); // first construct the topics to make ready final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>(); @@ -744,7 +743,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable internalTopicManager.makeReady(topicsToMakeReady); } - log.debug("Completed validating internal topics in partition assignor."); + log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions); } private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups, diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 1415ecc..8d623eb 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -202,6 +202,7 @@ class StreamsBrokerBounceTest(Test): return self.collect_results(sleep_time_secs) + @ignore @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown"], broker_type=["controller"], -- To stop receiving notification emails like this one, please contact guozh...@apache.org.