[jira] [Commented] (KAFKA-13391) Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory
[ https://issues.apache.org/jira/browse/KAFKA-13391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432831#comment-17432831 ] Luke Chen commented on KAFKA-13391: --- [~awilkinson] , thanks for reporting the issue. PR is ready for review: [https://github.com/apache/kafka/pull/11426] Welcome to provide comments. Thanks. > Failure on Windows due to AccessDeniedAcception when attempting to fsync the > parent directory > - > > Key: KAFKA-13391 > URL: https://issues.apache.org/jira/browse/KAFKA-13391 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Andy Wilkinson >Assignee: Luke Chen >Priority: Major > > There appears to be a regression in Kafka 3.0 due to [the > changes|https://github.com/apache/kafka/commit/66b0c5c64f2969dc62362b9f169ad1d18f64efe9] > made for KAFKA-3968 that causes a failure on Windows. After upgrading to > 3.0.0, we're seeing failures in Spring Boot's Windows CI such as the > following: > {code} > Caused by: java.nio.file.AccessDeniedException: > C:\Windows\TEMP\spring.kafka.915ab8c1-735c-4c88-8507-8d25fd050621920219824697516859 > > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941) > at > kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214) > at > kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2(KafkaServer.scala:772) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2$adapted(KafkaServer.scala:770) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.server.KafkaServer.checkpointBrokerMetadata(KafkaServer.scala:770) > at kafka.server.KafkaServer.startup(KafkaServer.scala:322) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:175) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:170) > at kafka.utils.TestUtils.createServer(TestUtils.scala) > {code} > While I'm [aware that Windows isn't officially > supported|https://issues.apache.org/jira/browse/KAFKA-12190?focusedCommentId=17264398&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17264398], > I think this problem is likely to be a blocker for anyone who uses Windows > for Kafka-based development work. > I suspect that the attempt to fsync the directory should just be skipped on > Window. Alternatively, the failure could be ignored on Windows. Lucene [added > similar functionality in the > past|https://issues.apache.org/jira/browse/LUCENE-5588] where it looks like > they opted to ignore IOExceptions on Windows rather than skipping the attempt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432832#comment-17432832 ] Luke Chen commented on KAFKA-13390: --- [~mvmn], thanks for reporting the issue. PR is ready for review: [https://github.com/apache/kafka/pull/11426] Welcome to provide comments. Thanks. > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Assignee: Luke Chen >Priority: Minor > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > (kafka.server.BrokerMetadataCheckpoint)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these files and > write to them. > P.S. See also > [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #11426: KAFKA-13391: don't fsync directory on Windows OS
showuon commented on pull request #11426: URL: https://github.com/apache/kafka/pull/11426#issuecomment-949335184 @ccding @junrao , please help review the PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11426: KAFKA-13391: don't fsync directory on Windows OS
showuon opened a new pull request #11426: URL: https://github.com/apache/kafka/pull/11426 It looks like Windows OS doesn't support `fsync` on directory. The same issues also happen on [LUCENE](https://issues.apache.org/jira/browse/LUCENE-5588) and [HDFS](https://issues.apache.org/jira/browse/HDFS-13586) projects. And the way they fix it is pretty much the same: to skip fsync directory on Windows OS. Here are the patches for both [LUCENE-5588](https://patch-diff.githubusercontent.com/raw/apache/lucenenet/pull/43.patch) and [HDFS-13586](https://issues.apache.org/jira/secure/attachment/12924032/HDFS-13586.001.patch). I followed their way to fix this issue. No tests added since it's just an OS check added, no logic change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9062) Handle stalled writes to RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432797#comment-17432797 ] James Cheng commented on KAFKA-9062: [~vvcephei], you said that in 2.6, you removed "bulk loading". Is it this JIRA? https://issues.apache.org/jira/browse/KAFKA-10005 > Handle stalled writes to RocksDB > > > Key: KAFKA-9062 > URL: https://issues.apache.org/jira/browse/KAFKA-9062 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > RocksDB may stall writes at times when background compactions or flushes are > having trouble keeping up. This means we can effectively end up blocking > indefinitely during a StateStore#put call within Streams, and may get kicked > from the group if the throttling does not ease up within the max poll > interval. > Example: when restoring large amounts of state from scratch, we use the > strategy recommended by RocksDB of turning off automatic compactions and > dumping everything into L0. We do batch somewhat, but do not sort these small > batches before loading into the db, so we end up with a large number of > unsorted L0 files. > When restoration is complete and we toggle the db back to normal (not bulk > loading) settings, a background compaction is triggered to merge all these > into the next level. This background compaction can take a long time to merge > unsorted keys, especially when the amount of data is quite large. > Any new writes while the number of L0 files exceeds the max will be stalled > until the compaction can finish, and processing after restoring from scratch > can block beyond the polling interval -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9062) Handle stalled writes to RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432794#comment-17432794 ] James Cheng commented on KAFKA-9062: Are there any metrics that we can measure and look at, to see if we are being impacted by the issue? Anything in RocksDb or Kafka streams? Maybe PUT latency? Thanks. > Handle stalled writes to RocksDB > > > Key: KAFKA-9062 > URL: https://issues.apache.org/jira/browse/KAFKA-9062 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > RocksDB may stall writes at times when background compactions or flushes are > having trouble keeping up. This means we can effectively end up blocking > indefinitely during a StateStore#put call within Streams, and may get kicked > from the group if the throttling does not ease up within the max poll > interval. > Example: when restoring large amounts of state from scratch, we use the > strategy recommended by RocksDB of turning off automatic compactions and > dumping everything into L0. We do batch somewhat, but do not sort these small > batches before loading into the db, so we end up with a large number of > unsorted L0 files. > When restoration is complete and we toggle the db back to normal (not bulk > loading) settings, a background compaction is triggered to merge all these > into the next level. This background compaction can take a long time to merge > unsorted keys, especially when the amount of data is quite large. > Any new writes while the number of L0 files exceeds the max will be stalled > until the compaction can finish, and processing after restoring from scratch > can block beyond the polling interval -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs
jolshan commented on pull request #11331: URL: https://github.com/apache/kafka/pull/11331#issuecomment-949274575 TODOs: 1. Change inconsistent topic ID so it is no longer a top level error 2. Maybe refactor some of the receiving side code, we have a map with TopicIdPartition, PartitionData and both contain topic ID -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ocadaruma commented on pull request #11422: KAFKA-9648: Add configuration to adjust listen backlog size for Acceptor
ocadaruma commented on pull request #11422: URL: https://github.com/apache/kafka/pull/11422#issuecomment-949269755 Let me mention @rajinisivaram, @mimaison, @dajac here as you participated in the vote for KIP-764. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 commented on pull request #11340: URL: https://github.com/apache/kafka/pull/11340#issuecomment-949189579 @guozhangwang @showuon and @hachikuji Thank you for paying so much attention to this issue and giving a lot of valuable discussions. > Hence it occurs to me that, the best effort for now could be that we still do not block for longer than the poll timeout --- i.e. we may even return early --- but we do not give up the commit either, and hence here async commit where could be potentially completed across multiple poll calls would be reasonable. After thinking about it, I think this is indeed a good solution in the short term. If the rebalance protocol can be separated from the poll thread later, it will make the poll method more pure (only poll message from cache or send fetch request). This is a very challenging task, and a new thread may be introduced. Or you can combine heartbeat threads to try to achieve separation. Thank you for the long-term active discussion. I also hope that the Kafka project will develop better and better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on pull request #11284: URL: https://github.com/apache/kafka/pull/11284#issuecomment-949090564 @junrao - I believe I've addressed all of the points listed except: 1. The listener/SASL mechanism prefix issues are really confusing to me because the code is currently behaving the way that I'm assuming it's supposed to. For example, if I have a listener named `foo` and an OAuth configuration named `bar`, `sasl.oauthbearer.bar=1` is picked up by my code unless there's a listener-specific override (e.g. `listener.name.foo.oauthbearer.sasl.oauthbearer.bar=2`). And if I specify a value for some other listener (e.g. `listener.name.someotherlistener.oauthbearer.sasl.oauthbearer.bar=3`, I don't see that value in the callback handler that is initialized for the `foo` listener. 2. I need to implement exponential backoff for the case of on-demand refresh of the JWKS. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734121443 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jose4j.jwk.HttpsJwks; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.lang.JoseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or + * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's + * possible to receive a JWT that contains a kid that points to yet-unknown JWK, + * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice, + * keys are made available for some amount of time before they're used within JWTs. + * + * This instance is created and provided to the + * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using + * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then + * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of + * a JWT. + * + * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver + * @see org.jose4j.keys.resolvers.VerificationKeyResolver + * @see ValidatorAccessTokenValidator + */ + +public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable { + +private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class); + +private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16; + +private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 6; + +private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000; + +private static final int SHUTDOWN_TIMEOUT = 10; + +private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS; + +private final ScheduledExecutorService executorService; + +private final long refreshMs; + +private final ReadWriteLock refreshLock = new ReentrantReadWriteLock(); + +private final Map missingKeyIds; + +private List jsonWebKeys; + +private boolean isInited; + +/** + * + * @param location HTTP/HTTPS endpoint from which to retrieve the JWKS based on + * the OAuth/OIDC standard + * @param refreshMs The number of milliseconds between refresh passes to connect + * to the OAuth/OIDC JWKS endpoint to retrieve the latest set + */ + +public RefreshingHttpsJwks(String location, long refreshMs) { +super(location); + +if (refreshMs <= 0) +throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive"); + +setDefaultCacheDuration(refreshMs); + +this.refreshMs = refreshMs; +this.executorService = Executors.newSingleThreadScheduledExecutor(); +this.missingKeyIds = new LinkedHashMap(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES; +} +}; +} + +@Override +public void init() throws IOException { +try { +log.debug("init started"); + +List localJWKs; + +try { +localJWKs = super.getJsonWebKeys(); +} catch (JoseException e) { +throw new IOException("Could not refresh JWKS", e); +} + +try
[jira] [Comment Edited] (KAFKA-8197) Flaky Test kafka.server.DynamicBrokerConfigTest > testPasswordConfigEncoderSecretChange
[ https://issues.apache.org/jira/browse/KAFKA-8197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432749#comment-17432749 ] A. Sophie Blee-Goldman edited comment on KAFKA-8197 at 10/21/21, 11:56 PM: --- [Failed again:|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11379/1/testReport/junit/kafka.server/DynamicBrokerConfigTest/Build___JDK_17_and_Scala_2_13___testPasswordConfigEncoderSecretChange__/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed] h3. Stacktrace org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at app//kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:307) was (Author: ableegoldman): Failed again: h3. Stacktrace org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at app//kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:307) > Flaky Test kafka.server.DynamicBrokerConfigTest > > testPasswordConfigEncoderSecretChange > --- > > Key: KAFKA-8197 > URL: https://issues.apache.org/jira/browse/KAFKA-8197 > Project: Kafka > Issue Type: Improvement > Components: core, unit tests >Affects Versions: 1.1.1 >Reporter: Guozhang Wang >Priority: Major > > {code} > 09:18:23 kafka.server.DynamicBrokerConfigTest > > testPasswordConfigEncoderSecretChange FAILED > 09:18:23 org.junit.ComparisonFailure: expected:<[staticLoginModule > required;]> but was:<[????O?i???A?c'??Ch?|?p]> > 09:18:23 at org.junit.Assert.assertEquals(Assert.java:115) > 09:18:23 at org.junit.Assert.assertEquals(Assert.java:144) > 09:18:23 at > kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:253) > {code} > https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/13466/consoleFull -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #11379: KAFKA-12994: Migrated SlidingWindowsTest to new API
ableegoldman commented on pull request #11379: URL: https://github.com/apache/kafka/pull/11379#issuecomment-949085661 Yep -- see https://issues.apache.org/jira/browse/KAFKA-8197 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8197) Flaky Test kafka.server.DynamicBrokerConfigTest > testPasswordConfigEncoderSecretChange
[ https://issues.apache.org/jira/browse/KAFKA-8197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432749#comment-17432749 ] A. Sophie Blee-Goldman commented on KAFKA-8197: --- Failed again: h3. Stacktrace org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at app//kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:307) > Flaky Test kafka.server.DynamicBrokerConfigTest > > testPasswordConfigEncoderSecretChange > --- > > Key: KAFKA-8197 > URL: https://issues.apache.org/jira/browse/KAFKA-8197 > Project: Kafka > Issue Type: Improvement > Components: core, unit tests >Affects Versions: 1.1.1 >Reporter: Guozhang Wang >Priority: Major > > {code} > 09:18:23 kafka.server.DynamicBrokerConfigTest > > testPasswordConfigEncoderSecretChange FAILED > 09:18:23 org.junit.ComparisonFailure: expected:<[staticLoginModule > required;]> but was:<[????O?i???A?c'??Ch?|?p]> > 09:18:23 at org.junit.Assert.assertEquals(Assert.java:115) > 09:18:23 at org.junit.Assert.assertEquals(Assert.java:144) > 09:18:23 at > kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:253) > {code} > https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/13466/consoleFull -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734120121 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/JwksFileVerificationKeyResolver.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.nio.file.Path; +import java.security.Key; +import java.util.List; +import org.apache.kafka.common.utils.Utils; +import org.jose4j.jwk.JsonWebKeySet; +import org.jose4j.jws.JsonWebSignature; +import org.jose4j.jwx.JsonWebStructure; +import org.jose4j.keys.resolvers.JwksVerificationKeyResolver; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.jose4j.lang.JoseException; +import org.jose4j.lang.UnresolvableKeyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JwksFileVerificationKeyResolver is a {@link VerificationKeyResolver} implementation + * that will load the JWKS from the given file system directory. Review comment: Done. Take a look and let me know if there's more that needs to be said. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] officialpatterson commented on pull request #11379: KAFKA-12994: Migrated SlidingWindowsTest to new API
officialpatterson commented on pull request #11379: URL: https://github.com/apache/kafka/pull/11379#issuecomment-949084918 > LGTM -- just one unrelated flaky test failure in `kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange()` Do we track flaky tests in Jira to be worked on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12994. Resolution: Fixed > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Andrew patterson >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432748#comment-17432748 ] A. Sophie Blee-Goldman commented on KAFKA-12994: Last PR for this merged, thanks guys! > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Andrew patterson >Priority: Major > Labels: kip-633, newbie, newbie++ > Fix For: 3.1.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #11379: kafka-12994: Migrated SlidingWindowsTest to new API
ableegoldman merged pull request #11379: URL: https://github.com/apache/kafka/pull/11379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734111966 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { Review comment: Or should I change the `AuthenticateCallbackHandler` interface's `configure` method to take the listener name too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734111966 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { Review comment: Or should I change the `AuthenticateCallbackHandler` interface's, `configure` method to take the listener name too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734111852 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { Review comment: Per the `AuthenticateCallbackHandler` interface, the `configure` method parameters do not include the listener name. Based on my research, it seemed like the upstream code only passed in the configuration that's either listener-based or top-level. The SASL mechanism is passed in for the case of the broker side. The later configuration lookups will look for the optional SASL mechanism prefix and use it if the configuration is there, or fall back to the top-level configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
ableegoldman commented on pull request #11405: URL: https://github.com/apache/kafka/pull/11405#issuecomment-949073433 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
ableegoldman merged pull request #11405: URL: https://github.com/apache/kafka/pull/11405 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734107280 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class); + +public static final String CLIENT_ID_CONFIG = "clientId"; +public static final String CLIENT_SECRET_CONFIG = "clientSecret"; +public static final String SCOPE_CONFIG = "scope"; + +public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + +"client ID to uniquely identify the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " + +"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " + +"account and identifies the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + +"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " + +"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + +"include with the login request."; + +private static final String EXTENSION_PREFIX = "extension_"; + +private Map moduleOptions; + +private AccessTokenRetriever accessTokenRetriever; + +private AccessTokenValidator accessTokenValidator; + +private boolean isInitialized = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) +throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + +if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) +throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); + +moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions()); +AccessTokenRetriever accessTokenRetriever = AccessTokenRetriever
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734106778 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/PemDirectoryVerificationKeyResolver.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.security.Key; +import java.security.PublicKey; +import java.security.interfaces.ECPublicKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.InvalidKeySpecException; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.common.utils.Utils; +import org.jose4j.jwk.EllipticCurveJsonWebKey; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.jwk.RsaJsonWebKey; +import org.jose4j.jws.JsonWebSignature; +import org.jose4j.jwx.JsonWebStructure; +import org.jose4j.keys.EcKeyUtil; +import org.jose4j.keys.RsaKeyUtil; +import org.jose4j.keys.resolvers.JwksVerificationKeyResolver; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.jose4j.lang.JoseException; +import org.jose4j.lang.UnresolvableKeyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PemVerificationKeyResolver is a {@link VerificationKeyResolver} implementation Review comment: It was going to be a format that we support, but it doesn't need to be in the first version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734100842 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jose4j.jwk.HttpsJwks; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.lang.JoseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or + * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's + * possible to receive a JWT that contains a kid that points to yet-unknown JWK, + * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice, + * keys are made available for some amount of time before they're used within JWTs. + * + * This instance is created and provided to the + * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using + * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then + * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of + * a JWT. + * + * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver + * @see org.jose4j.keys.resolvers.VerificationKeyResolver + * @see ValidatorAccessTokenValidator + */ + +public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable { + +private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class); + +private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16; + +private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 6; + +private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000; + +private static final int SHUTDOWN_TIMEOUT = 10; + +private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS; + +private final ScheduledExecutorService executorService; + +private final long refreshMs; + +private final ReadWriteLock refreshLock = new ReentrantReadWriteLock(); + +private final Map missingKeyIds; + +private List jsonWebKeys; + +private boolean isInited; + +/** + * + * @param location HTTP/HTTPS endpoint from which to retrieve the JWKS based on + * the OAuth/OIDC standard + * @param refreshMs The number of milliseconds between refresh passes to connect + * to the OAuth/OIDC JWKS endpoint to retrieve the latest set + */ + +public RefreshingHttpsJwks(String location, long refreshMs) { +super(location); + +if (refreshMs <= 0) +throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive"); + +setDefaultCacheDuration(refreshMs); + +this.refreshMs = refreshMs; +this.executorService = Executors.newSingleThreadScheduledExecutor(); +this.missingKeyIds = new LinkedHashMap(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES; +} +}; +} + +@Override +public void init() throws IOException { +try { +log.debug("init started"); + +List localJWKs; + +try { +localJWKs = super.getJsonWebKeys(); +} catch (JoseException e) { +throw new IOException("Could not refresh JWKS", e); +} + +try
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734100491 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java ## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jose4j.jwk.HttpsJwks; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.lang.JoseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or + * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's + * possible to receive a JWT that contains a kid that points to yet-unknown JWK, + * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice, + * keys are made available for some amount of time before they're used within JWTs. + * + * This instance is created and provided to the + * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using + * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then + * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of + * a JWT. + * + * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver + * @see org.jose4j.keys.resolvers.VerificationKeyResolver + * @see ValidatorAccessTokenValidator + */ + +public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable { + +private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class); + +private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16; + +private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 6; + +private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000; + +private static final int SHUTDOWN_TIMEOUT = 10; + +private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS; + +private final ScheduledExecutorService executorService; + +private final long refreshMs; + +private final ReadWriteLock refreshLock = new ReentrantReadWriteLock(); + +private final Map missingKeyIds; + +private List jsonWebKeys; + +private boolean isInited; + +/** + * + * @param location HTTP/HTTPS endpoint from which to retrieve the JWKS based on + * the OAuth/OIDC standard + * @param refreshMs The number of milliseconds between refresh passes to connect + * to the OAuth/OIDC JWKS endpoint to retrieve the latest set + */ + +public RefreshingHttpsJwks(String location, long refreshMs) { +super(location); + +if (refreshMs <= 0) +throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive"); + +setDefaultCacheDuration(refreshMs); + +this.refreshMs = refreshMs; +this.executorService = Executors.newSingleThreadScheduledExecutor(); +this.missingKeyIds = new LinkedHashMap(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) { +@Override +protected boolean removeEldestEntry(Map.Entry eldest) { +return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES; +} +}; +} + +@Override +public void init() throws IOException { +try { +log.debug("init started"); + +List localJWKs; + +try { +localJWKs = super.getJsonWebKeys(); +} catch (JoseException e) { +throw new IOException("Could not refresh JWKS", e); +} + +try
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734096643 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retry encapsulates the mechanism to perform a retry and then exponential + * backoff using provided wait times between attempts. + * + * @param Result type + */ + +public class Retry { Review comment: I'm a little hesitant to do so, only because I want to have freedom to refactor as needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734096345 ## File path: clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java ## @@ -75,30 +78,111 @@ public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the" + " credential's lifetime has been reached, at which time it will try to refresh the credential." + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used" -+ " if no value is specified. Currently applies only to OAUTHBEARER."; ++ " if no value is specified." ++ OAUTHBEARER_NOTE; public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80; public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter"; public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime" + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;" -+ " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER."; ++ " a default value of 0.05 (5%) is used if no value is specified." ++ OAUTHBEARER_NOTE; public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05; public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds"; public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential," + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and " + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential." -+ " Currently applies only to OAUTHBEARER."; ++ OAUTHBEARER_NOTE; public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60; public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds"; public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential," + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain" + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified." + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential." -+ " Currently applies only to OAUTHBEARER."; ++ OAUTHBEARER_NOTE; public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300; +public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms"; +public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout." ++ OAUTHBEARER_NOTE; + +public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms"; +public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout." ++ OAUTHBEARER_NOTE; + +// These are only specified here outside their normal groupings so that they can be +// forward referencing. +public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms"; +public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms"; + +private static final String EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the" ++ " " + SASL_LOGIN_RETRY_BACKOFF_MS ++ " setting and will double in wait length between attempts up to a maximum wait length specified by the" ++ " " + SASL_LOGIN_RETRY_BACKOFF_MAX_MS ++ " setting."; Review comment: I have added the `OAUTHBEARER_NOTE` to the `EXPONENTIAL_BACKOFF_NOTE` so that it is included in the documentation for the `sasl.login.retry.backoff.ms` and `sasl.login.retry.backoff.max.ms` configuration options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-ma
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734095192 ## File path: clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java ## @@ -75,30 +78,111 @@ public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the" + " credential's lifetime has been reached, at which time it will try to refresh the credential." + " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used" -+ " if no value is specified. Currently applies only to OAUTHBEARER."; ++ " if no value is specified." ++ OAUTHBEARER_NOTE; public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80; public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter"; public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime" + " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;" -+ " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER."; ++ " a default value of 0.05 (5%) is used if no value is specified." ++ OAUTHBEARER_NOTE; public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05; public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds"; public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential," + " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and " + " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential." -+ " Currently applies only to OAUTHBEARER."; ++ OAUTHBEARER_NOTE; public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60; public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds"; public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential," + " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain" + " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified." + " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential." -+ " Currently applies only to OAUTHBEARER."; ++ OAUTHBEARER_NOTE; public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300; +public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms"; +public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout." ++ OAUTHBEARER_NOTE; + +public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms"; +public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout." ++ OAUTHBEARER_NOTE; + +// These are only specified here outside their normal groupings so that they can be +// forward referencing. +public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms"; +public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms"; + +private static final String EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the" ++ " " + SASL_LOGIN_RETRY_BACKOFF_MS ++ " setting and will double in wait length between attempts up to a maximum wait length specified by the" ++ " " + SASL_LOGIN_RETRY_BACKOFF_MAX_MS ++ " setting."; + +public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 1; +public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between login attempts to the" ++ " external authentication provider." ++ EXPONENTIAL_BACKOFF_NOTE; + +public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100; +public static
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734089482 ## File path: build.gradle ## @@ -1604,6 +1608,7 @@ project(':tools') { implementation libs.slf4jApi implementation libs.log4j +implementation libs.jose4j Review comment: It is needed here as per the explanation for the need for the dependency in `:core`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734089125 ## File path: build.gradle ## @@ -829,6 +829,7 @@ project(':core') { implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes implementation libs.joptSimple +implementation libs.jose4j Review comment: Clients intentionally *doesn't* have the jose4j dependency (to keep client dependencies to a minimum), so we have to explicitly include it here. Suggestions welcome on a better way to handle it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734086876 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { +this.configs = configs; + +if (saslMechanism != null && !saslMechanism.trim().isEmpty()) +this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim()); +else +this.prefix = null; +} + +public Map getSslClientConfig(String uriConfigName) { +String urlConfigValue = get(uriConfigName); + +if (urlConfigValue == null || urlConfigValue.trim().isEmpty()) +throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName)); + +URL url; + +try { +url = new URL(urlConfigValue); +} catch (IOException e) { +throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue)); +} + +if (!url.getProtocol().equalsIgnoreCase("https")) { +log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url); +return null; +} + +ConfigDef sslConfigDef = new ConfigDef(); +sslConfigDef.withClientSslSupport(); Review comment: That said, you're right, the code was accidentally referring to the top-level configuration `Map`, not the JAAS options `Map`. That has been fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734086548 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { +this.configs = configs; + +if (saslMechanism != null && !saslMechanism.trim().isEmpty()) +this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim()); +else +this.prefix = null; +} + +public Map getSslClientConfig(String uriConfigName) { +String urlConfigValue = get(uriConfigName); + +if (urlConfigValue == null || urlConfigValue.trim().isEmpty()) +throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName)); + +URL url; + +try { +url = new URL(urlConfigValue); +} catch (IOException e) { +throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue)); +} + +if (!url.getProtocol().equalsIgnoreCase("https")) { +log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url); +return null; +} + +ConfigDef sslConfigDef = new ConfigDef(); +sslConfigDef.withClientSslSupport(); Review comment: The client SSL configuration for OAuth comes from the JAAS options, not the configuration. So the user can specify SSL options like this: ``` sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientId="foo" \ clientSecret="bar" \ scope="baz" \ ssl.protocol="SSL" \ ssl.keystore.type="" ; ``` This allows the HTTP client connection to use the same set of SSL configuration when connecting to the OAuth provider. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734083140 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/PemDirectoryVerificationKeyResolver.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.security.Key; +import java.security.PublicKey; +import java.security.interfaces.ECPublicKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.InvalidKeySpecException; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.common.utils.Utils; +import org.jose4j.jwk.EllipticCurveJsonWebKey; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.jwk.RsaJsonWebKey; +import org.jose4j.jws.JsonWebSignature; +import org.jose4j.jwx.JsonWebStructure; +import org.jose4j.keys.EcKeyUtil; +import org.jose4j.keys.RsaKeyUtil; +import org.jose4j.keys.resolvers.JwksVerificationKeyResolver; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.jose4j.lang.JoseException; +import org.jose4j.lang.UnresolvableKeyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PemVerificationKeyResolver is a {@link VerificationKeyResolver} implementation + * that will load the PEM files from the given file system directory. + * + * The instance is configured with the directory name that contains one or more + * https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail\";>public key files Review comment: That said, I will strip this out as it's not needed for a first version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734082935 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/PemDirectoryVerificationKeyResolver.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.security.Key; +import java.security.PublicKey; +import java.security.interfaces.ECPublicKey; +import java.security.interfaces.RSAPublicKey; +import java.security.spec.InvalidKeySpecException; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.common.utils.Utils; +import org.jose4j.jwk.EllipticCurveJsonWebKey; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.jwk.RsaJsonWebKey; +import org.jose4j.jws.JsonWebSignature; +import org.jose4j.jwx.JsonWebStructure; +import org.jose4j.keys.EcKeyUtil; +import org.jose4j.keys.RsaKeyUtil; +import org.jose4j.keys.resolvers.JwksVerificationKeyResolver; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.jose4j.lang.JoseException; +import org.jose4j.lang.UnresolvableKeyException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PemVerificationKeyResolver is a {@link VerificationKeyResolver} implementation + * that will load the PEM files from the given file system directory. + * + * The instance is configured with the directory name that contains one or more + * https://en.wikipedia.org/wiki/Privacy-Enhanced_Mail\";>public key files Review comment: I believe so 🤷 From that Wikipedia page: > Privacy-Enhanced Mail (PEM) is a de facto file format for storing and sending cryptographic keys, certificates, and other data. and... > PEM data is commonly stored in files with a ".pem" suffix, a ".cer" or ".crt" suffix (for certificates), or a ".key" suffix (for public or private keys). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734081500 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { Review comment: Yes, I have added appropriate JavaDoc describing this callback handler and contrasting it with the other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734081204 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { Review comment: Yes, I have added appropriate JavaDoc describing this callback handler and contrasting it with the other. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432733#comment-17432733 ] Aditya Upadhyaya commented on KAFKA-12559: -- [~ableegoldman]: thanks :) I'll go through the KIP process and revert back to you if I have any questions. > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Aditya Upadhyaya >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya Upadhyaya reassigned KAFKA-12559: Assignee: Aditya Upadhyaya (was: Martin Sundeqvist) > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Aditya Upadhyaya >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] FlorianLehmann opened a new pull request #11425: KAFKA-13393 Documentation - Add missing arguments to create a topic
FlorianLehmann opened a new pull request #11425: URL: https://github.com/apache/kafka/pull/11425 In the quickstart documentation (quickstart_createtopic) , there is a command specified to create a topic: `$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092` However, it is no longer working due to missing arguments: - partitions - replications-factor The previous command should be replaced with this one: `$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13393) Update website documentation to include required arguments when creating a topic
Florian Lehmann created KAFKA-13393: --- Summary: Update website documentation to include required arguments when creating a topic Key: KAFKA-13393 URL: https://issues.apache.org/jira/browse/KAFKA-13393 Project: Kafka Issue Type: Bug Components: docs, documentation, website Affects Versions: 3.0.0 Reporter: Florian Lehmann In the quickstart documentation ([quickstart_createtopic|https://kafka.apache.org/quickstart#quickstart_createtopic]) , there is a command specified to create a topic: {code:java} $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 {code} However, it is no longer working due to missing arguments: * partitions * replications-factor The previous command should be replaced with this one: {code:java} $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #11414: MINOR: Renamed a few record definition files with the existing convention.
junrao merged pull request #11414: URL: https://github.com/apache/kafka/pull/11414 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
ableegoldman commented on pull request #11405: URL: https://github.com/apache/kafka/pull/11405#issuecomment-948980709 Soaks have been healthy, and all tests passed except one flaky `kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11420: KAFKA-12648: extend IQ APIs to work with named topologies
ableegoldman commented on pull request #11420: URL: https://github.com/apache/kafka/pull/11420#issuecomment-948964447 cc @wcarlson5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on pull request #11404: MINOR: update Kafka Streams standby task config
JimGalasyn commented on pull request #11404: URL: https://github.com/apache/kafka/pull/11404#issuecomment-948928072 > > @astubbs There is a TOC at the top of the page... Not sure what else we could do? > > b) actually move the section to be the first section, as it's far more important to read than _anything_ else on that page We could move this section under Required Settings and note that although not required, it's extremely important (or critical) to set `num.standby.replicas` to 1 to ensure high availability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on a change in pull request #11404: MINOR: update Kafka Streams standby task config
JimGalasyn commented on a change in pull request #11404: URL: https://github.com/apache/kafka/pull/11404#discussion_r733974182 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -680,14 +680,18 @@ default.windowed.value.serde.innerState section. + +Recommendation: +Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability. + Note that you will require more client side storage space as well (twice as much with 1 standby). Review comment: ```suggestion Increasing the number of standbys requires more client-side storage space. For example, with 1 standby, twice as much space is required. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13388) Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432667#comment-17432667 ] David Hoffman edited comment on KAFKA-13388 at 10/21/21, 6:47 PM: -- I have a screenshot of some logging I added that shows the connection stuck in CHECKING_API_VERSIONS state. The logging I added grabbed all the nodeIds from ClusterConnectionStates and determined if the Producer would treat them as 'not ready' by checking the state, selector and whether there were in flight requests. If they were 'not ready' it logged them and some other info. It ran on a schedule every 30 seconds. For this occurrence, I looked up the node id, and it was the leader of the partition that batches expired for. That's all I have right now. It's relatively rare, happens like 1 or 2 times a day for 32 application instances connecting to 64 kafka brokers. I am trying to narrow it down more by adding more info and waiting for it to happen again. Like one question I was wondering is if there is an outstanding in flight request when it's in this state or did that somehow get dropped in the shuffle somewhere. !image-2021-10-21-13-42-06-528.png|width=664,height=308! was (Author: dhofftgt): I have a screenshot of some logging I added that shows the connection stuck in CHECKING_API_VERSIONS state. The logging I added grabbed all the nodeIds from ClusterConnectionStates and determined if the Producer would treat them as 'not ready' by checking the state, selector and whether there were in flight requests. If they were 'not ready' it logged them and some other info. It ran on a schedule every 30 seconds. For this occurrence, I looked up the node id, and it was the leader of the partition that batches expired for. That's all I have right now. It's relatively rare, happens one or two times a day. It doesn't happen super often, like 1 or 2 times a day for 32 application instances connecting to 64 kafka brokers. I am trying to narrow it down more by adding more info and waiting for it to happen again. Like one question I was wondering is if there is an outstanding in flight request when it's in this state or did that somehow get dropped in the shuffle somewhere. !image-2021-10-21-13-42-06-528.png|width=664,height=308! > Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS > -- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Major > Attachments: image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state. > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. I am seeing that sometimes a reply is not received for the > check api versions request the connection just hangs in CHECKING_API_VERSIONS > state until it is disposed I assume after the idle connection timeout. > I am guessing the connection setup timeout should be still in play for this, > but it is not. > There is a connectingNodes set that is consulted when checking timeouts and > the node is removed > when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13388) Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432667#comment-17432667 ] David Hoffman commented on KAFKA-13388: --- I have a screenshot of some logging I added that shows the connection stuck in CHECKING_API_VERSIONS state. The logging I added grabbed all the nodeIds from ClusterConnectionStates and determined if the Producer would treat them as 'not ready' by checking the state, selector and whether there were in flight requests. If they were 'not ready' it logged them and some other info. It ran on a schedule every 30 seconds. For this occurrence, I looked up the node id, and it was the leader of the partition that batches expired for. That's all I have right now. It's relatively rare, happens one or two times a day. It doesn't happen super often, like 1 or 2 times a day for 32 application instances connecting to 64 kafka brokers. I am trying to narrow it down more by adding more info and waiting for it to happen again. Like one question I was wondering is if there is an outstanding in flight request when it's in this state or did that somehow get dropped in the shuffle somewhere. !image-2021-10-21-13-42-06-528.png|width=664,height=308! > Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS > -- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Major > Attachments: image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state. > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. I am seeing that sometimes a reply is not received for the > check api versions request the connection just hangs in CHECKING_API_VERSIONS > state until it is disposed I assume after the idle connection timeout. > I am guessing the connection setup timeout should be still in play for this, > but it is not. > There is a connectingNodes set that is consulted when checking timeouts and > the node is removed > when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13388) Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Hoffman updated KAFKA-13388: -- Attachment: image-2021-10-21-13-42-06-528.png > Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS > -- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Major > Attachments: image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state. > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. I am seeing that sometimes a reply is not received for the > check api versions request the connection just hangs in CHECKING_API_VERSIONS > state until it is disposed I assume after the idle connection timeout. > I am guessing the connection setup timeout should be still in play for this, > but it is not. > There is a connectingNodes set that is consulted when checking timeouts and > the node is removed > when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733928342 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenRetrieverFactory.java ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; + +import java.net.URI; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; + +public class AccessTokenRetrieverFactory { + +/** + * Create an {@link AccessTokenRetriever} from the given SASL and JAAS configuration. + * + * Note: the returned AccessTokenRetriever is not initialized + * here and must be done by the caller prior to use. + * + * @param configsSASL configuration + * @param jaasConfig JAAS configuration + * + * @return Non-null {@link AccessTokenRetriever} + */ + +public static AccessTokenRetriever create(Map configs, Map jaasConfig) { +return create(configs, null, jaasConfig); +} + +public static AccessTokenRetriever create(Map configs, String saslMechanism, Map jaasConfig) { +ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism); +URI tokenEndpointUri = cu.validateUri(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI); + +if (tokenEndpointUri.getScheme().toLowerCase(Locale.ROOT).equals("file")) { +return new FileTokenRetriever(cu.validateFile(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI)); +} else { +ConfigurationUtils jaasCu = new ConfigurationUtils(jaasConfig); +String clientId = jaasCu.validateString(CLIENT_ID_CONFIG); +String clientSecret = jaasCu.validateString(CLIENT_SECRET_CONFIG); +String scope = jaasCu.get(SCOPE_CONFIG); + +SSLSocketFactory sslSocketFactory = cu.createSSLSocketFactory(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI); + +return new HttpAccessTokenRetriever(clientId, +clientSecret, +scope, +sslSocketFactory, +tokenEndpointUri.toString(), +cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS), +cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS), +cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false, null), Review comment: Changed to use the two-parameter version of `validateInteger`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
guozhangwang commented on pull request #11340: URL: https://github.com/apache/kafka/pull/11340#issuecomment-948851978 Thanks everyone for the valuable discussions, this is a tricky issue indeed and worth some more clarifications, on the status quo. In general there are a lot of issues we've observed with coupling the rebalance protocol with the `poll` calls, and in the long run we are thinking to move the rebalance as well as fetching records completely into the background thread, and hence caller `poll` would only be getting the buffered records --- I will submit a JIRA ticket for this soon --- but as for now, what I'm thinking is a short-term fix, and hence the goal is to fix it with as less scope and complexity as possible. The status quo is that, when we are about to enter the rebalance we try to commit sync with a `rebalance.timeout`. That means: 1) the time blocked on commit maybe LARGER than the timeout passed into `poll`. This is the main issue we'd like to address. 2) if the commit exhausted the timeout but still cannot succeed, we would "give up" with a warning log and still continue the rebalance protocol with the risk that the new host of those partitions may fetch stale committed offsets (or even no offsets at all, if there's no committed offsets before the rebalance happened). This is not ideal, but okay according to the at least once semantics --- note, with exactly once semantics we do not rely on consumer to commit offsets so EOS is not impacted by this behavior. So any solution we may propose, as long as it would not make further regression, would be fine. Now here are my thoughts: * it's risky to just always upper bound the commit timeout with the `poll` parameter value, since in practice many users may call `poll(0)` or with a very small millisecond parameter. This would mean that, we may be much more aggressive on giving up and continue, i.e. making 2) above more exaggerate. Imagine if there's a leader migration happening just at the same time of the rebalance, we would very easily give up the commit even with just one round-trip failure. * it's also risky to always upper bound the commit timeout with the configured rebalance regardless of the `poll` timeout, which we have been discussed many times above and we all know its impact now. So what's left seems to be that, we still make a reasonably "best effort" to commit offsets before continue the rebalance protocol, and that "best effort" should be somewhat irrelevant to the poll timeout, but also not blocking more than the timeout value itself. Hence it occurs to me that, the best effort for now could be that we still do not block for longer than the poll timeout --- i.e. we may even return early --- but we do not give up the commit either, and hence here async commit where could be potentially completed across multiple `poll` calls would be reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-948851456 @guozhangwang 1) The in memory default state stores already does the filtering. Original thought was to make the Persistent Store be able to also enforce the same retention and hence the checks everywhere that you see. 2) The idea of adding in MeteredStore was that the enforcement would be applicable to any custom store that's implemented. But turns out, it isn't that simple( I realised it after so many code changes lol). For persistent stores, we could push it down to the inner segmented stores(similar to how the in-memory stores do it today). If we can assume that custom state stores would take care of it, then no point having it in Metered store which goes against the original idea but should be ok IMO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733901463 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isInitialized = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +init(verificationKeyResolver, accessTokenValidator); +} + +public void init(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { +this.verificationKeyResolver = verificationKeyResolver; +this.accessTokenValidator = accessTokenValidator; + +try { +verificationKeyResolver.init(); +} catch (Exception e) { +throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e); +} + +isInitialized = true; +} + +@Override +public void close() { +if (verificationKeyResolver != null) { +try { +verificationKeyResolver.close(); +} catch (Exception e) { +log.error(e.getMessage(), e); +} +} +} + +@Override +public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { +checkInitialized(); + +for (Callback callback : callbacks) { +if (callback instanceof OAuthBearerValidatorCallback) { +handleValidatorCallback((OAuthBearerValidatorCallback) callback); +} else if (callback instanceof OAuthBearerExtensionsValidatorCallback) { +OAuthBearerExtensionsValidatorCallback extensionsCallback = (OAuthBearerExtensionsValidatorCallback) callback; + extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsCallback.valid(extensionName)); +} else { +throw new UnsupportedCallbackException(callback); +} +} +} + +private void handleValidatorCallback(OAuthBearerValidatorCallback callback) { +checkInitialized(); + +OAuthBearerToken token; + +try { +token = accessTokenValidator.validate(callback.tokenValue()); +log.debug("handle - token: {}", token); +callback.token(token); +} catch (ValidateException e) { +e.printStackTrace(); Review comment: Whoopsie! Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apach
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733897632 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.MalformedClaimException; +import org.jose4j.jwt.NumericDate; +import org.jose4j.jwt.ReservedClaimNames; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the broker to perform more extensive validation of the JWT access token that is received + * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's + * token endpoint. + * + * The validation steps performed (primary by the jose4j library) are: + * + * + * + * Basic structural validation of the b64token value as defined in + * https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 Section 2.1 + * + * Basic conversion of the token into an in-memory data structure + * + * Presence of scope, exp, subject, iss, and + * iat claims + * + * + * Signature matching validation against the kid and those provided b + * the OAuth/OIDC provider's JWKS + * + * + */ + +public class ValidatorAccessTokenValidator implements AccessTokenValidator { + +private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class); + +private final JwtConsumer jwtConsumer; + +private final String scopeClaimName; + +private final String subClaimName; + +/** + * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more + * thorough validation of the JWT. + * + * @param clockSkew The optional value (in seconds) to allow for differences + *between the time of the OAuth/OIDC identity provider and + *the broker. If null is provided, the broker + *and the OAUth/OIDC identity provider are assumed to have + *very close clock settings. + * @param expectedAudiences The (optional) set the broker will use to verify that + *the JWT was issued for one of the expected audiences. + *The JWT will be inspected for the standard OAuth + *aud claim and if this value is set, the + *broker will match the value from JWT's aud + *claim to see if there is an exact match. If there is no + *match, the broker will reject the JWT and authentication + *will fail. May be null to not perform an Review comment: Completed the thought so that it now reads: ``` May be null to not perform any check to verify the JWT's aud claim matches any fixed set of known/expected audiences. ``` Thanks for catching that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11424: Kafka 13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-948846674 @guozhangwang Implementation of the KIP. Note that number of files is big as this renames the cache size config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733900761 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.MalformedClaimException; +import org.jose4j.jwt.NumericDate; +import org.jose4j.jwt.ReservedClaimNames; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the broker to perform more extensive validation of the JWT access token that is received + * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's + * token endpoint. + * + * The validation steps performed (primary by the jose4j library) are: + * + * + * + * Basic structural validation of the b64token value as defined in + * https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 Section 2.1 + * + * Basic conversion of the token into an in-memory data structure + * + * Presence of scope, exp, subject, iss, and + * iat claims + * + * + * Signature matching validation against the kid and those provided b + * the OAuth/OIDC provider's JWKS + * + * + */ + +public class ValidatorAccessTokenValidator implements AccessTokenValidator { + +private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class); + +private final JwtConsumer jwtConsumer; + +private final String scopeClaimName; + +private final String subClaimName; + +/** + * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more + * thorough validation of the JWT. + * + * @param clockSkew The optional value (in seconds) to allow for differences + *between the time of the OAuth/OIDC identity provider and + *the broker. If null is provided, the broker + *and the OAUth/OIDC identity provider are assumed to have + *very close clock settings. + * @param expectedAudiences The (optional) set the broker will use to verify that + *the JWT was issued for one of the expected audiences. + *The JWT will be inspected for the standard OAuth + *aud claim and if this value is set, the + *broker will match the value from JWT's aud + *claim to see if there is an exact match. If there is no + *match, the broker will reject the JWT and authentication + *will fail. May be null to not perform an + * @param expectedIssuer The (optional) value for the broker to use to verify that + *the JWT was created by the expected issuer. The JWT will + *be inspected for the standard OAuth iss claim + *and if this value is set, the broker will match it exactly + *against what is in the JWT's iss claim. + *If there is no match, the broker will reject the JWT and + *a
[GitHub] [kafka] vamossagar12 opened a new pull request #11424: Kafka 13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 opened a new pull request #11424: URL: https://github.com/apache/kafka/pull/11424 This PR is an implementation of: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390. The following changes have been made: - Adding a new config input.buffer.max.bytes applicable at a topology level. - Adding new config statestore.cache.max.bytes. - Adding new metric called total-bytes . - The per partition config buffered.records.per.partition is deprecated. - The config cache.max.bytes.buffering is deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733897632 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.MalformedClaimException; +import org.jose4j.jwt.NumericDate; +import org.jose4j.jwt.ReservedClaimNames; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the broker to perform more extensive validation of the JWT access token that is received + * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's + * token endpoint. + * + * The validation steps performed (primary by the jose4j library) are: + * + * + * + * Basic structural validation of the b64token value as defined in + * https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 Section 2.1 + * + * Basic conversion of the token into an in-memory data structure + * + * Presence of scope, exp, subject, iss, and + * iat claims + * + * + * Signature matching validation against the kid and those provided b + * the OAuth/OIDC provider's JWKS + * + * + */ + +public class ValidatorAccessTokenValidator implements AccessTokenValidator { + +private static final Logger log = LoggerFactory.getLogger(ValidatorAccessTokenValidator.class); + +private final JwtConsumer jwtConsumer; + +private final String scopeClaimName; + +private final String subClaimName; + +/** + * Creates a new ValidatorAccessTokenValidator that will be used by the broker for more + * thorough validation of the JWT. + * + * @param clockSkew The optional value (in seconds) to allow for differences + *between the time of the OAuth/OIDC identity provider and + *the broker. If null is provided, the broker + *and the OAUth/OIDC identity provider are assumed to have + *very close clock settings. + * @param expectedAudiences The (optional) set the broker will use to verify that + *the JWT was issued for one of the expected audiences. + *The JWT will be inspected for the standard OAuth + *aud claim and if this value is set, the + *broker will match the value from JWT's aud + *claim to see if there is an exact match. If there is no + *match, the broker will reject the JWT and authentication + *will fail. May be null to not perform an Review comment: Completed the thought so that it now reads: ``` May be null to not perform any check to verify the JWT's audience matches any fixed set of known/expected audiences. ``` Thanks for catching that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733895025 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.MalformedClaimException; +import org.jose4j.jwt.NumericDate; +import org.jose4j.jwt.ReservedClaimNames; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the broker to perform more extensive validation of the JWT access token that is received + * from the client, but ultimately from posting the client credentials to the OAuth/OIDC provider's + * token endpoint. + * + * The validation steps performed (primary by the jose4j library) are: + * + * + * + * Basic structural validation of the b64token value as defined in + * https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 Section 2.1 + * + * Basic conversion of the token into an in-memory data structure + * + * Presence of scope, exp, subject, iss, and + * iat claims + * + * + * Signature matching validation against the kid and those provided b Review comment: Thank ou for noticing that 😄 . Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
guozhangwang commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-948839504 Hi @vamossagar12 @ableegoldman I took a quick look at the PR and I have a few questions: * The fix itself seems to be applied only to persistent stores (built-in or customized), and not in-memory, assuming the latter has done its duties. But for customized in-memory stores it may not be the case. Is that right? * I'm a it concerned about applying the filter logic at the metered store, for its complexity introduced. Instead, I'm wondering if we could enforce the retention period restriction into the inner store, i.e. 1) the built-in stores, at the `segmented store` layer would enforce this rule based on its observed stream time, and 2) we just assume customized stores themselves would enforce this rule, and hence do not have the second guard at the outer meteredStore layer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733893905 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ValidatorAccessTokenValidator.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.jose4j.jwa.AlgorithmConstraints.DISALLOW_NONE; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.jose4j.jwt.JwtClaims; +import org.jose4j.jwt.MalformedClaimException; +import org.jose4j.jwt.NumericDate; +import org.jose4j.jwt.ReservedClaimNames; +import org.jose4j.jwt.consumer.InvalidJwtException; +import org.jose4j.jwt.consumer.JwtConsumer; +import org.jose4j.jwt.consumer.JwtConsumerBuilder; +import org.jose4j.jwt.consumer.JwtContext; +import org.jose4j.keys.resolvers.VerificationKeyResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ValidatorAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the broker to perform more extensive validation of the JWT access token that is received Review comment: Yes and no. The `AccessTokenValidator` interface is used by both client and broker, but the `ValidatorAccessTokenValidator` implementation is specifically designed to only be used on the broker while the `LoginAccessTokenValidator` is used as the client implementation. The reason for this is that on the broker we have the ability to bundle utility libraries (like jose4j) that allow `ValidatorAccessTokenValidator` to perform more thorough validation of the JWT. We intentionally want to keep the client dependencies lightweight, so `LoginAccessTokenValidator` only performs basic parsing and sanity checking on the JWT. I made an attempt to document this rationale in the `AccessTokenValidator` interface documentation. Let me know if there's something more specific I should put in there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733888042 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733887473 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733882914 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733864413 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733854104 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733851806 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { +this.configs = configs; + +if (saslMechanism != null && !saslMechanism.trim().isEmpty()) +this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim()); +else +this.prefix = null; +} + +public Map getSslClientConfig(String uriConfigName) { +String urlConfigValue = get(uriConfigName); + +if (urlConfigValue == null || urlConfigValue.trim().isEmpty()) +throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName)); + +URL url; + +try { +url = new URL(urlConfigValue); +} catch (IOException e) { +throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue)); +} + +if (!url.getProtocol().equalsIgnoreCase("https")) { +log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url); +return null; +} + +ConfigDef sslConfigDef = new ConfigDef(); +sslConfigDef.withClientSslSupport(); +AbstractConfig sslClientConfig = new AbstractConfig(sslConfigDef, configs); +return sslClientConfig.values(); +} + +public SSLSocketFactory createSSLSocketFactory(String uriConfigName) { +Map sslClientConfig = getSslClientConfig(uriConfigName); + +if (sslClientConfig == null) { +log.warn("Requesting SSL client socket factory but SSL configs were null"); +return null; +} + +SslFactory sslFactory = new SslFactory(Mode.CLIENT); +sslFactory.configure(sslClientConfig); +SSLSocketFactory socketFactory = ((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext().getSocketFactory(); +log.debug("Created SSLSocketFactory: {}", sslClientConfig); +return socketFactory; +} + +/** + * Validates that, if a value is supplied, is a file that: + * + * + * exists + * has read permission + * points to a file + * + * + * If the value is null or an empty string, it is assumed to be an "empty" value and thus. + * ignored. Any whitespace is trimmed off of the beginning and end. + */ + +public Path validateFile(String name) { +URI uri = validateUri(name); +File file = new File(uri.getRawPath()).getAbsoluteFile(); + +if (!file.exists()) +throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) t
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r733850703 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationUtils is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + +private final Map configs; + +private final String prefix; + +public ConfigurationUtils(Map configs) { +this(configs, null); +} + +public ConfigurationUtils(Map configs, String saslMechanism) { +this.configs = configs; + +if (saslMechanism != null && !saslMechanism.trim().isEmpty()) +this.prefix = ListenerName.saslMechanismPrefix(saslMechanism.trim()); +else +this.prefix = null; +} + +public Map getSslClientConfig(String uriConfigName) { +String urlConfigValue = get(uriConfigName); + +if (urlConfigValue == null || urlConfigValue.trim().isEmpty()) +throw new ConfigException(String.format("The OAuth configuration option %s is required", uriConfigName)); + +URL url; + +try { +url = new URL(urlConfigValue); +} catch (IOException e) { +throw new ConfigException(String.format("The OAuth configuration option %s was not a valid URL (%s)", uriConfigName, urlConfigValue)); +} + +if (!url.getProtocol().equalsIgnoreCase("https")) { +log.warn("Not creating SSL socket factory as URL for {} ({}) is not SSL-/TLS-based", uriConfigName, url); +return null; +} + +ConfigDef sslConfigDef = new ConfigDef(); +sslConfigDef.withClientSslSupport(); +AbstractConfig sslClientConfig = new AbstractConfig(sslConfigDef, configs); +return sslClientConfig.values(); +} + +public SSLSocketFactory createSSLSocketFactory(String uriConfigName) { +Map sslClientConfig = getSslClientConfig(uriConfigName); + +if (sslClientConfig == null) { +log.warn("Requesting SSL client socket factory but SSL configs were null"); Review comment: I refactored the code a bit, so this has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sebbASF opened a new pull request #11423: Update doap_Kafka.rdf
sebbASF opened a new pull request #11423: URL: https://github.com/apache/kafka/pull/11423 Wrong syntax; which is why Kafka does not appear in BigData category on projects.a.o -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432564#comment-17432564 ] stuxnet commented on KAFKA-10413: - Hello, I confirm that we also see the same issue, any news on this ? Thanks > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13388) Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432563#comment-17432563 ] David Jacot commented on KAFKA-13388: - Interesting... It would be great if you could provide more details (logs, dumps, etc.). There might be a bug somewhere. > Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS > -- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Major > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state. > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. I am seeing that sometimes a reply is not received for the > check api versions request the connection just hangs in CHECKING_API_VERSIONS > state until it is disposed I assume after the idle connection timeout. > I am guessing the connection setup timeout should be still in play for this, > but it is not. > There is a connectingNodes set that is consulted when checking timeouts and > the node is removed > when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ocadaruma opened a new pull request #11422: KAFKA-9648: Add configuration to adjust listen backlog size for Acceptor
ocadaruma opened a new pull request #11422: URL: https://github.com/apache/kafka/pull/11422 https://issues.apache.org/jira/browse/KAFKA-9648 * This PR implements [KIP-764](https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor) - Add new KafkaConfig `socket.listen.backlog.size` which is passed to [ServerSocket#bind](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/net/ServerSocket.html#bind(java.net.SocketAddress,int)) to adjust the max length of the queue of incoming connections. * Please refer the wiki page for the motivation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11345: Allow empty last segment to have missing offset index during recovery
ccding commented on pull request #11345: URL: https://github.com/apache/kafka/pull/11345#issuecomment-948707270 @kowshik I will find some time to work on it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13388) Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432504#comment-17432504 ] David Hoffman commented on KAFKA-13388: --- [~dajac] Thanks for looking into this and replying. We are using 2.7.1 at the moment. I do see that the api version request should timeout with the request.timeout.ms which is the default for our app. I can't turn on debug logging where I am seeing this issue due to the volume of logs. I have some code periodically inspecting the states of connections that dumps some info. I have seen connections that have those expired batches stuck in CHECKING_API_VERSIONS for longer than the request timeout. I will keep digging to see why. > Kafka Producer has no timeout for nodes stuck in CHECKING_API_VERSIONS > -- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Major > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state. > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. I am seeing that sometimes a reply is not received for the > check api versions request the connection just hangs in CHECKING_API_VERSIONS > state until it is disposed I assume after the idle connection timeout. > I am guessing the connection setup timeout should be still in play for this, > but it is not. > There is a connectingNodes set that is consulted when checking timeouts and > the node is removed > when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13391) Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory
[ https://issues.apache.org/jira/browse/KAFKA-13391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432399#comment-17432399 ] Luke Chen commented on KAFKA-13391: --- Also happened in HDFS, and the solution is also to skip fsync when in Windows. https://issues.apache.org/jira/browse/HDFS-13586 > Failure on Windows due to AccessDeniedAcception when attempting to fsync the > parent directory > - > > Key: KAFKA-13391 > URL: https://issues.apache.org/jira/browse/KAFKA-13391 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Andy Wilkinson >Assignee: Luke Chen >Priority: Major > > There appears to be a regression in Kafka 3.0 due to [the > changes|https://github.com/apache/kafka/commit/66b0c5c64f2969dc62362b9f169ad1d18f64efe9] > made for KAFKA-3968 that causes a failure on Windows. After upgrading to > 3.0.0, we're seeing failures in Spring Boot's Windows CI such as the > following: > {code} > Caused by: java.nio.file.AccessDeniedException: > C:\Windows\TEMP\spring.kafka.915ab8c1-735c-4c88-8507-8d25fd050621920219824697516859 > > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941) > at > kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214) > at > kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2(KafkaServer.scala:772) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2$adapted(KafkaServer.scala:770) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.server.KafkaServer.checkpointBrokerMetadata(KafkaServer.scala:770) > at kafka.server.KafkaServer.startup(KafkaServer.scala:322) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:175) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:170) > at kafka.utils.TestUtils.createServer(TestUtils.scala) > {code} > While I'm [aware that Windows isn't officially > supported|https://issues.apache.org/jira/browse/KAFKA-12190?focusedCommentId=17264398&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17264398], > I think this problem is likely to be a blocker for anyone who uses Windows > for Kafka-based development work. > I suspect that the attempt to fsync the directory should just be skipped on > Window. Alternatively, the failure could be ignored on Windows. Lucene [added > similar functionality in the > past|https://issues.apache.org/jira/browse/LUCENE-5588] where it looks like > they opted to ignore IOExceptions on Windows rather than skipping the attempt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option
[ https://issues.apache.org/jira/browse/KAFKA-13392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yevgeniy Korin updated KAFKA-13392: --- Description: *Scenario when we faced with this issue:* One of three brokers is down. Add another (fourth) broker and try to reassign partitions using '--bootstrap-server' option. *What's failed:* {code:java} /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --reassignment-json-file /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} failed with {code:java} Error: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435) at kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412) at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974) at kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255) at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216) at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: incrementalAlterConfigs{code} *Expected behavio**:* partition reassignment process started. *Workaround:* Trigger partition reassignment process using '--zookeeper' option: {code:java} /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper.my.company:2181/kafka-cluster --reassignment-json-file /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} *Additional info:* We are able to trigger partition reassignment using '--bootstrap-server' option with no exceptions when all four brokers are alive. was: *Scenario when we faced with this issue:* One of three brokers is down. Add another (fourth) broker and try to reassign partitions using `--bootstrap-server` option. *What's failed:* `/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --reassignment-json-file /tmp/reassignment-20211021130718.json --throttle 1 --execute` failed with Error: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435) at kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412) at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974) at kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255) at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216) at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) Caused by: org.apache.kafka.com
[jira] [Created] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option
Yevgeniy Korin created KAFKA-13392: -- Summary: Timeout Exception triggering reassign partitions with --bootstrap-server option Key: KAFKA-13392 URL: https://issues.apache.org/jira/browse/KAFKA-13392 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 2.8.0 Reporter: Yevgeniy Korin *Scenario when we faced with this issue:* One of three brokers is down. Add another (fourth) broker and try to reassign partitions using `--bootstrap-server` option. *What's failed:* `/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --reassignment-json-file /tmp/reassignment-20211021130718.json --throttle 1 --execute` failed with Error: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435) at kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412) at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974) at kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255) at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216) at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: incrementalAlterConfigs *Expected behavio**:* partition reassignment process started. *Workaround:* Trigger partition reassignment process using `–zookeeper` option: /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper.my.company:2181/kafka-cluster --reassignment-json-file /tmp/reassignment-20211021130718.json --throttle 1 --execute Additional info: We were able to trigger partition reassignment using `--bootstrap-server` option when all four brokers are alive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432355#comment-17432355 ] Nicholas Telford commented on KAFKA-10493: -- Good to hear progress is being made on a solution. I just wanted to add that there is another way to end up with out-of-order data on a stream, even if your single-writer is guaranteeing ordering on the topic: a custom TimestampExtractor, to extract event time from your records. > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > Attachments: KTableOutOfOrderBug.java, out-of-order-table.png > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13391) Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory
[ https://issues.apache.org/jira/browse/KAFKA-13391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-13391: - Assignee: Luke Chen > Failure on Windows due to AccessDeniedAcception when attempting to fsync the > parent directory > - > > Key: KAFKA-13391 > URL: https://issues.apache.org/jira/browse/KAFKA-13391 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Andy Wilkinson >Assignee: Luke Chen >Priority: Major > > There appears to be a regression in Kafka 3.0 due to [the > changes|https://github.com/apache/kafka/commit/66b0c5c64f2969dc62362b9f169ad1d18f64efe9] > made for KAFKA-3968 that causes a failure on Windows. After upgrading to > 3.0.0, we're seeing failures in Spring Boot's Windows CI such as the > following: > {code} > Caused by: java.nio.file.AccessDeniedException: > C:\Windows\TEMP\spring.kafka.915ab8c1-735c-4c88-8507-8d25fd050621920219824697516859 > > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941) > at > kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214) > at > kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2(KafkaServer.scala:772) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2$adapted(KafkaServer.scala:770) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.server.KafkaServer.checkpointBrokerMetadata(KafkaServer.scala:770) > at kafka.server.KafkaServer.startup(KafkaServer.scala:322) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:175) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:170) > at kafka.utils.TestUtils.createServer(TestUtils.scala) > {code} > While I'm [aware that Windows isn't officially > supported|https://issues.apache.org/jira/browse/KAFKA-12190?focusedCommentId=17264398&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17264398], > I think this problem is likely to be a blocker for anyone who uses Windows > for Kafka-based development work. > I suspect that the attempt to fsync the directory should just be skipped on > Window. Alternatively, the failure could be ignored on Windows. Lucene [added > similar functionality in the > past|https://issues.apache.org/jira/browse/LUCENE-5588] where it looks like > they opted to ignore IOExceptions on Windows rather than skipping the attempt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-13390: - Assignee: Luke Chen > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Assignee: Luke Chen >Priority: Minor > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > (kafka.server.BrokerMetadataCheckpoint)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these files and > write to them. > P.S. See also > [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13391) Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory
[ https://issues.apache.org/jira/browse/KAFKA-13391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432337#comment-17432337 ] Andy Wilkinson commented on KAFKA-13391: This may be a duplicate of KAFKA-13390 which was also opened today. The Stack Overflow question to which it links shows the same stack trace at least. > Failure on Windows due to AccessDeniedAcception when attempting to fsync the > parent directory > - > > Key: KAFKA-13391 > URL: https://issues.apache.org/jira/browse/KAFKA-13391 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Andy Wilkinson >Priority: Major > > There appears to be a regression in Kafka 3.0 due to [the > changes|https://github.com/apache/kafka/commit/66b0c5c64f2969dc62362b9f169ad1d18f64efe9] > made for KAFKA-3968 that causes a failure on Windows. After upgrading to > 3.0.0, we're seeing failures in Spring Boot's Windows CI such as the > following: > {code} > Caused by: java.nio.file.AccessDeniedException: > C:\Windows\TEMP\spring.kafka.915ab8c1-735c-4c88-8507-8d25fd050621920219824697516859 > > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941) > at > kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214) > at > kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2(KafkaServer.scala:772) > at > kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2$adapted(KafkaServer.scala:770) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.server.KafkaServer.checkpointBrokerMetadata(KafkaServer.scala:770) > at kafka.server.KafkaServer.startup(KafkaServer.scala:322) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:175) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:170) > at kafka.utils.TestUtils.createServer(TestUtils.scala) > {code} > While I'm [aware that Windows isn't officially > supported|https://issues.apache.org/jira/browse/KAFKA-12190?focusedCommentId=17264398&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17264398], > I think this problem is likely to be a blocker for anyone who uses Windows > for Kafka-based development work. > I suspect that the attempt to fsync the directory should just be skipped on > Window. Alternatively, the failure could be ignored on Windows. Lucene [added > similar functionality in the > past|https://issues.apache.org/jira/browse/LUCENE-5588] where it looks like > they opted to ignore IOExceptions on Windows rather than skipping the attempt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13391) Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory
Andy Wilkinson created KAFKA-13391: -- Summary: Failure on Windows due to AccessDeniedAcception when attempting to fsync the parent directory Key: KAFKA-13391 URL: https://issues.apache.org/jira/browse/KAFKA-13391 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.0.0 Reporter: Andy Wilkinson There appears to be a regression in Kafka 3.0 due to [the changes|https://github.com/apache/kafka/commit/66b0c5c64f2969dc62362b9f169ad1d18f64efe9] made for KAFKA-3968 that causes a failure on Windows. After upgrading to 3.0.0, we're seeing failures in Spring Boot's Windows CI such as the following: {code} Caused by: java.nio.file.AccessDeniedException: C:\Windows\TEMP\spring.kafka.915ab8c1-735c-4c88-8507-8d25fd050621920219824697516859 at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:83) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) at sun.nio.fs.WindowsFileSystemProvider.newFileChannel(WindowsFileSystemProvider.java:115) at java.nio.channels.FileChannel.open(FileChannel.java:287) at java.nio.channels.FileChannel.open(FileChannel.java:335) at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:953) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:941) at kafka.server.BrokerMetadataCheckpoint.liftedTree1$1(BrokerMetadataCheckpoint.scala:214) at kafka.server.BrokerMetadataCheckpoint.write(BrokerMetadataCheckpoint.scala:204) at kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2(KafkaServer.scala:772) at kafka.server.KafkaServer.$anonfun$checkpointBrokerMetadata$2$adapted(KafkaServer.scala:770) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) at kafka.server.KafkaServer.checkpointBrokerMetadata(KafkaServer.scala:770) at kafka.server.KafkaServer.startup(KafkaServer.scala:322) at kafka.utils.TestUtils$.createServer(TestUtils.scala:175) at kafka.utils.TestUtils$.createServer(TestUtils.scala:170) at kafka.utils.TestUtils.createServer(TestUtils.scala) {code} While I'm [aware that Windows isn't officially supported|https://issues.apache.org/jira/browse/KAFKA-12190?focusedCommentId=17264398&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17264398], I think this problem is likely to be a blocker for anyone who uses Windows for Kafka-based development work. I suspect that the attempt to fsync the directory should just be skipped on Window. Alternatively, the failure could be ignored on Windows. Lucene [added similar functionality in the past|https://issues.apache.org/jira/browse/LUCENE-5588] where it looks like they opted to ignore IOExceptions on Windows rather than skipping the attempt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Priority: Minor (was: Major) > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Minor > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > (kafka.server.BrokerMetadataCheckpoint)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these files and > write to them. > P.S. See also > [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Description: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and four ERROR log lines with messages: {{ERRROR Failed to write meta.properties due to (kafka.server.BrokerMetadataCheckpoint)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. But after the attempted startup the directory contains files, specifically meta.properties of size 94 bytes, recovery-point-offset-checkpoint and log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, .kafka_cleanshutdown). Which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] was: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and four ERROR log lines with messages: {{ERRROR Failed to write meta.properties due to (kafka.server.BrokenMetadataCheckpoint)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. But after the attempted startup the directory contains files, specifically meta.properties of size 94 bytes, recovery-point-offset-checkpoint and log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, .kafka_cleanshutdown). Which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Major > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > (kafka.server.BrokerMetadataCheckpoint)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these files and > write to t
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Description: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and four ERROR log lines with messages: {{ERRROR Failed to write meta.properties due to (kafka.server.BrokenMetadataCheckpoint)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. But after the attempted startup the directory contains files, specifically meta.properties of size 94 bytes, recovery-point-offset-checkpoint and log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, .kafka_cleanshutdown). Which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] was: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and four ERROR log lines with messages: {{ERRROR Failed to write meta.properties due to {{(kafka.server.BrokenMetadataCheckpoint) {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data {{...}} {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data {{...}} ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel) ... ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel) Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. But after the attempted startup the directory contains files, specifically meta.properties of size 94 bytes, recovery-point-offset-checkpoint and log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, .kafka_cleanshutdown). Which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Major > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > (kafka.server.BrokenMetadataCheckpoint)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException c:\Users\user\kafka3-data}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Description: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and four ERROR log lines with messages: {{ERRROR Failed to write meta.properties due to {{(kafka.server.BrokenMetadataCheckpoint) {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data {{...}} {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)}} {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data {{...}} ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel) ... ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel) Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. But after the attempted startup the directory contains files, specifically meta.properties of size 94 bytes, recovery-point-offset-checkpoint and log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, .kafka_cleanshutdown). Which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] was: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} However, that directory actually contains these files, and they are not empty (have size of 6 bytes though). I've started with empty directory, which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Major > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and four ERROR log lines with messages: > {{ERRROR Failed to write meta.properties due to > {{(kafka.server.BrokenMetadataCheckpoint) > {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data > {{...}} > {{ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)}} > {{java.nio.file.AccessDeniedException {{c:\Users\user\kafka3-data > {{...}} > ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel) > ... > ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel) > Before Kafka startup log directory ({{c:\Users\user\kafka3-data)}} was empty. > But after the attempted startup the directory contains files, specifically > meta.properties of size 94 bytes, recovery-point-offset-checkpoint and > log-start-offset-checkpoint - both of size 6 bytes, and 4 more 0 sized files > (replication-offset-checkpoint, cleaner-offset-checkpoint, .lock, > .kafka_cleanshutdown). > Which indicates that Kafka actually has permissions to create these files and > write to them. > P.S. See also > [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Description: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} However, that directory actually contains these files, and they are not empty (have size of 6 bytes though). I've started with empty directory, which indicates that Kafka actually has permissions to create these files and write to them. P.S. See also [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] was: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} However, that directory actually contains these files, and they are not empty (have size of 6 bytes though) P.S. See also https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Major > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and two ERROR log lines with messages: > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > > However, that directory actually contains these files, and they are not > empty (have size of 6 bytes though). I've started with empty directory, which > indicates that Kafka actually has permissions to create these files and write > to them. > P.S. See also > [https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13390) Kafka 3 fails to run on Windows
[ https://issues.apache.org/jira/browse/KAFKA-13390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mykola Makhin updated KAFKA-13390: -- Description: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} However, that directory actually contains these files, and they are not empty (have size of 6 bytes though) P.S. See also https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows was: During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} > Kafka 3 fails to run on Windows > --- > > Key: KAFKA-13390 > URL: https://issues.apache.org/jira/browse/KAFKA-13390 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Mykola Makhin >Priority: Major > > During startup Kafka creates some files in logs folder, but then outputs two > java.nio.file.AccessDeniedException exceptions for logs folder (configured in > server.properties via log.dirs), and two ERROR log lines with messages: > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\recovery-point-offset checkpoint > }}{{(kafka.server.LogDirFailureChannel)}} > {{...}} > {{ERROR Error while writing to checkpoint file > c:\Users\user\kafka3-data\log-start-offset-checkpoint > (kafka.server.LogDirFailureChannel)}} > > However, that directory actually contains these files, and they are not empty > (have size of 6 bytes though) > P.S. See also > https://stackoverflow.com/questions/69289641/accessdeniedexception-while-running-apache-kafka-3-on-windows -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13390) Kafka 3 fails to run on Windows
Mykola Makhin created KAFKA-13390: - Summary: Kafka 3 fails to run on Windows Key: KAFKA-13390 URL: https://issues.apache.org/jira/browse/KAFKA-13390 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Mykola Makhin During startup Kafka creates some files in logs folder, but then outputs two java.nio.file.AccessDeniedException exceptions for logs folder (configured in server.properties via log.dirs), and two ERROR log lines with messages: {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\recovery-point-offset checkpoint }}{{(kafka.server.LogDirFailureChannel)}} {{...}} {{ERROR Error while writing to checkpoint file c:\Users\user\kafka3-data\log-start-offset-checkpoint (kafka.server.LogDirFailureChannel)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] astubbs commented on pull request #11404: MINOR: update Kafka Streams standby task config
astubbs commented on pull request #11404: URL: https://github.com/apache/kafka/pull/11404#issuecomment-948399673 > @astubbs There is a TOC at the top of the page... Not sure what else we could do? Yeah, but for such _critical_ settings, it's way too subtle. People don't read carefully entire TOCs. Two options (not that I'm a doc designer or anything) - perhaps a) but a `WARNING:: please note read section blah blah` at the top - like an [asciidoc admonitions](https://docs.asciidoctor.org/asciidoc/latest/syntax-quick-reference/#admonitions) b) actually move the section to be the first section, as it's far more important to read than _anything_ else on that page - for example, if you forget to put an app id your app will crash immediately, bu the resiliency settings are not obvious until you have yourself a sev1 in prod. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #11345: Allow empty last segment to have missing offset index during recovery
kowshik edited a comment on pull request #11345: URL: https://github.com/apache/kafka/pull/11345#issuecomment-948322460 @ccding Are you planning to add a unit test for this PR, and address the recent review comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #11345: Allow empty last segment to have missing offset index during recovery
kowshik commented on pull request #11345: URL: https://github.com/apache/kafka/pull/11345#issuecomment-948322460 @ccding Are you planning to add a unit test for this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
showuon commented on pull request #11340: URL: https://github.com/apache/kafka/pull/11340#issuecomment-948316660 > And if the asynchronous submission fails all the time, we need to consider how to control a timeout period during multiple calls to consumer.poll. I don't think that would happen. If so, there might be some issues on the brokers. > I think it’s a weird practice to maintain a timeout for asynchronous operations. At the same time, its effect is actually equivalent to only do commitOffset once, setting a timeout MIN (poll timer, the configured timer) for this commit, and letting customers to consider how to better adjust these parameters poll timer, the configured timer. No, the `commitAsync` timeout meaning here is different from the `commitSync`. The `commitSync` with `MIN (poll timer, the configured timer)` will keep retrying if retriable errors, on the other hand, the timeout in `commitAsync`, is the timeout we wait for the response from the broker (and just once). I think @guozhangwang and @hachikuji 's async commit proposal can completely fix the issue you described. And of course, your proposal to have 2 more configs to allow users to adjust sync/async and timeout value is more flexible. In my opinion, I'd prefer to fix in a simpler way, to always have async commit in `onJoinPrepare`. That's my 2 cents. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org