[ 
https://issues.apache.org/jira/browse/KAFKA-9005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947560#comment-16947560
 ] 

Bruno Cadonna commented on KAFKA-9005:
--------------------------------------

[~arnaud.villevieille] I wrote an answer to your stackoverflow question. Did 
you check whether the Kafka Streams application has permissions to create 
topics on the Kafka broker? 

> Kafka stream: “TopicAuthorizationException: Not authorized to access topics” 
> for an internal state store
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9005
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9005
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Arnaud Villevieille
>            Priority: Blocker
>
> Java: OpenJdk 11
>  Java: OpenJdk 11
>  Kafka server: 2.2.0
>  Kafka streams lib: 2.3.0
> i have created a stackoverflow  query 
> [here|[https://stackoverflow.com/questions/58299827/kafka-stream-topicauthorizationexception-not-authorized-to-access-topics-for]]
>  
> I am trying to deploy my Kafka streams application in a *docker* container 
> and it fails while trying to create an internal state store with a 
> TopicAuthorizationException.It works well locally. The main difference 
> between locally and on the server is that there it connects to a server 
> deployed Kafka and authenticates using the usual *Kerberos* auth.I fail to 
> understand the link between authentication and the *local stores*.
> My stream looks like that:
> {code:java}
> StreamsBuilder builder = new StreamsBuilder();
>         //We stream from the source topic
>         KStream<String, EnrichedMessagePayload> sourceMessagesStream = 
> builder.stream(sourceTopic, Consumed
>                 .with(Serdes.serdeFrom(String.class), INPUT_SERDE));
>         //We group per room and window
>         TimeWindowedKStream<String, EnrichedMessagePayload> windowed = 
> sourceMessagesStream
>                 
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));
>         //We make them a list
>         KStream<Windowed<String>, WindowedMessages> grouped = windowed
>                 .aggregate(WindowedMessages::new,
>                         (key, value, aggregate) -> aggregate.add(value),
>                         Materialized.with(Serdes.String(), 
> Serdes.serdeFrom(windowSerializer, windowSerializer)))
>                 .suppress(Suppressed.untilWindowCloses(unbounded()))
>                 .toStream();
>         //Filter
>         KStream<Windowed<String>, FilterResult> filtered = grouped
>                 .mapValues((readOnlyKey, value) -> 
> filterWindow(value.getMessages()));
>         //Re map to its original form
>         KStream<String, OutputPayload> reduced = filtered
>                 .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, 
> Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
>                         .getMessages()
>                         .stream().map(payload -> new KeyValue<>(key.key(), 
> payload))
>                         .collect(toList()));
>         //Target topic
>         reduced.to(sinkTopic, Produced
>                 .with(Serdes.serdeFrom(String.class), SERDE));
>         return builder.build();
> {code}
> It receives a stream of messages, windows it, aggregates all the messages per 
> window, keeps only the last version of the list with a 'Suppressed' and then 
> flatMaps the whole to forward it to another topic.
> Every time i get that kind of exception:
>  
> {quote}org.apache.kafka.common.errors.TopicAuthorizationException: Not 
> authorized to access topics: [Topic authorization failed.]> Error message 
> was: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
> authorized to access topics: [Topic authorization failed.]2019-10-09 
> 06:44:03.255 +0000 ERROR 
> [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] 
> [StreamThread.java:777] - stream-thread 
> [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered 
> the following unexpected Kafka exception during processing, this usually 
> indicate Streams internal errors: - 
> [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744]
>  - [] - []org.apache.kafka.streams.errors.StreamsException: Could not create 
> topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226)
>  at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)Caused
>  by: org.apache.kafka.common.errors.TopicAuthorizationException: Not 
> authorized to access topics: [Topic authorization failed.]
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to