[ 
https://issues.apache.org/jira/browse/KAFKA-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323245#comment-16323245
 ] 

ASF GitHub Bot commented on KAFKA-6250:
---------------------------------------

hachikuji closed pull request #4247: KAFKA-6250: Use existing Kafka Connect 
internal topics without requiring ACL
URL: https://github.com/apache/kafka/pull/4247
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 5da4f2d00d0..ad21561baf2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -229,13 +230,20 @@ public boolean createTopic(NewTopic topic) {
                 newlyCreatedTopicNames.add(topic);
             } catch (ExecutionException e) {
                 Throwable cause = e.getCause();
-                if (e.getCause() instanceof TopicExistsException) {
+                if (cause instanceof TopicExistsException) {
                     log.debug("Found existing topic '{}' on the brokers at 
{}", topic, bootstrapServers);
                     continue;
                 }
                 if (cause instanceof UnsupportedVersionException) {
-                    log.debug("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}," +
-                                      "falling back to assume topic(s) exist 
or will be auto-created by the broker", topicNameList, bootstrapServers);
+                    log.debug("Unable to create topic(s) '{}' since the 
brokers at {} do not support the CreateTopics API.",
+                            " Falling back to assume topic(s) exist or will be 
auto-created by the broker.",
+                            topicNameList, bootstrapServers);
+                    return Collections.emptySet();
+                }
+                if (cause instanceof ClusterAuthorizationException) {
+                    log.debug("Not authorized to create topic(s) '{}'." +
+                            " Falling back to assume topic(s) exist or will be 
auto-created by the broker.",
+                            topicNameList, bootstrapServers);
                     return Collections.emptySet();
                 }
                 if (cause instanceof TimeoutException) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index c58d6741f58..cda68795689 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -60,6 +60,19 @@ public void returnNullWithApiVersionMismatch() {
         }
     }
 
+    @Test
+    public void returnNullWithClusterAuthorizationFailure() {
+        final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);
+        }
+    }
+
     @Test
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
@@ -120,6 +133,10 @@ private CreateTopicsResponse 
createTopicResponseWithUnsupportedVersion(NewTopic.
         return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, 
"This version of the API is not supported"), topics);
     }
 
+    private CreateTopicsResponse 
createTopicResponseWithClusterAuthorizationException(NewTopic... topics) {
+        return createTopicResponse(new 
ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create 
topic(s)"), topics);
+    }
+
     private CreateTopicsResponse createTopicResponse(ApiError error, 
NewTopic... topics) {
         if (error == null) error = new ApiError(Errors.NONE, "");
         Map<String, ApiError> topicResults = new HashMap<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect requires permission to create internal topics even if they exist
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6250
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6250
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Gavrie Philipson
>             Fix For: 1.1.0
>
>
> When using Kafka Connect with a cluster that doesn't allow the user to create 
> topics (due to ACL configuration), Connect fails when trying to create its 
> internal topics, even if these topics already exist.
> This happens specifically when using hosted [Aiven 
> Kafka|https://aiven.io/kafka], which does not permit creation of topics via 
> the Kafka Admin Client API.
> The problem is that Connect tries to create the topics, and ignores some 
> specific errors such as topics that already exist, but not authorization 
> errors.
> This is what happens:
> {noformat}
> 2017-11-21 15:57:24,176 [DistributedHerder] ERROR DistributedHerder:206 - 
> Uncaught exception in herder work thread, exiting:
> org.apache.kafka.connect.errors.ConnectException: Error while attempting to 
> create/find topic(s) 'connect-offsets'
>       at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:245)
>       at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
>       at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:126)
>       at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
>       at org.apache.kafka.connect.runtime.Worker.start(Worker.java:146)
>       at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:99)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:194)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
> authorization failed.
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
>       at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:226)
>       ... 11 more
> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: 
> Cluster authorization failed.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to