[jira] [Updated] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved
[ https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Duran Vegas updated KAFKA-13386: --- Description: The join optimization assumes the serializer is deterministic and invariant across upgrades. So in case of changes this opimitzation will drop invalid/intermediate records. In other situations we have relied on the same property, for example when computing whether an update is a duplicate result or not. The problem is that some serializers are sadly not deterministic. [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] {code:java} //If this value doesn't match the current value from the original table, it is stale and should be discarded. if (java.util.Arrays.equals(messageHash, currentHash)) {{code} A solution for this problem would be that the comparison use foreign-key reference itself instead of the whole message hash. The bug fix proposal is to be allow the user to choose between one method of comparison or another (whole hash or Fk reference). This would fix the problem of dropping valid records on certain cases and allow the user to also choose the current optimized way of checking valid records and intermediate results dropping. was: The join optimization assumes the serializer is deterministic and invariant across upgrades. I can recall other discussions in which we wanted to rely on the same property, for example when computing whether an update is a duplicate result or not. [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] {code:java} //If this value doesn't match the current value from the original table, it is stale and should be discarded. if (java.util.Arrays.equals(messageHash, currentHash)) {{code} A solution for this problem would be that the comparison use foreign-key reference itself instead of the whole message hash. The bug fix proposal is to be allow the user to choose between one method of comparison or another (whole hash or Fk reference). This would fix the problem of dropping valid records on certain cases and allow the user to also choose the current optimized way of checking valid records and intermediate results dropping. > Foreign Key Join filtering out valid records after a code change / schema > evolved > - > > Key: KAFKA-13386 > URL: https://issues.apache.org/jira/browse/KAFKA-13386 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Sergio Duran Vegas >Priority: Major > > The join optimization assumes the serializer is deterministic and invariant > across upgrades. So in case of changes this opimitzation will drop > invalid/intermediate records. In other situations we have relied on the same > property, for example when computing whether an update is a duplicate result > or not. > > The problem is that some serializers are sadly not deterministic. > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] > > {code:java} > //If this value doesn't match the current value from the original table, it > is stale and should be discarded. > if (java.util.Arrays.equals(messageHash, currentHash)) {{code} > > A solution for this problem would be that the comparison use foreign-key > reference itself instead of the whole message hash. > > The bug fix proposal is to be allow the user to choose between one method of > comparison or another (whole hash or Fk reference). This would fix the > problem of dropping valid records on certain cases and allow the user to also > choose the current optimized way of checking valid records and intermediate > results dropping. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved
[ https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergio Duran Vegas updated KAFKA-13386: --- Summary: Foreign Key Join filtering out valid records after a code change / schema evolved (was: Foreign Key Join filtering valid records after a code change / schema evolved) > Foreign Key Join filtering out valid records after a code change / schema > evolved > - > > Key: KAFKA-13386 > URL: https://issues.apache.org/jira/browse/KAFKA-13386 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Sergio Duran Vegas >Priority: Major > > The join optimization assumes the serializer is deterministic and invariant > across upgrades. I can recall other discussions in which we wanted to rely on > the same property, for example when computing whether an update is a > duplicate result or not. > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] > > {code:java} > //If this value doesn't match the current value from the original table, it > is stale and should be discarded. > if (java.util.Arrays.equals(messageHash, currentHash)) {{code} > > A solution for this problem would be that the comparison use foreign-key > reference itself instead of the whole message hash. > > The bug fix proposal is to be allow the user to choose between one method of > comparison or another (whole hash or Fk reference). This would fix the > problem of dropping valid records on certain cases and allow the user to also > choose the current optimized way of checking valid records and intermediate > results dropping. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13386) Foreign Key Join filtering valid records after a code change / schema evolved
Sergio Duran Vegas created KAFKA-13386: -- Summary: Foreign Key Join filtering valid records after a code change / schema evolved Key: KAFKA-13386 URL: https://issues.apache.org/jira/browse/KAFKA-13386 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.2 Reporter: Sergio Duran Vegas The join optimization assumes the serializer is deterministic and invariant across upgrades. I can recall other discussions in which we wanted to rely on the same property, for example when computing whether an update is a duplicate result or not. [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java] {code:java} //If this value doesn't match the current value from the original table, it is stale and should be discarded. if (java.util.Arrays.equals(messageHash, currentHash)) {{code} A solution for this problem would be that the comparison use foreign-key reference itself instead of the whole message hash. The bug fix proposal is to be allow the user to choose between one method of comparison or another (whole hash or Fk reference). This would fix the problem of dropping valid records on certain cases and allow the user to also choose the current optimized way of checking valid records and intermediate results dropping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 edited a comment on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 edited a comment on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369 Thanks Sophie for that. @showuon , I think your concern about persistent stores and custom stores is still valid. The reason why I don't want to add the getObservedStreamTime method in SessionStore or WindowStore is that I feel it's not something which should be added to those interfaces. It's an internal level detail about how to track observed stream time. Having said that, if we want this behaviour to be also available for custom stores(and which is why we chose to add it to MeteredStores), then those custom stores need to implement PersistentStore classes which is not the way it works today, right? One approach could be to think about custom state stores separately and have this merged is this looks fine. That's because I think custom state stores will need more thinking because of the way the State stores are structured or wrapped. And also, today the Persistent StateStores provided by Kafka Streams don't enforce retention time which IMO is a bigger problem than Custom State Stores. That's my personal opinion and I am open for suggestions. Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732341161 ## File path: clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +public class AccessTokenValidatorFactoryTest extends OAuthBearerTest { + +@Test +public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() { +OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); +AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() { +@Override +public void init() throws IOException { +throw new IOException("My init had an error!"); +} +@Override +public String retrieve() { +return "dummy"; +} +}; + +Map configs = getSaslConfigs(); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs); + +assertThrowsWithMessage( +KafkaException.class, () -> handler.configure(accessTokenRetriever, accessTokenValidator), "encountered an error when initializing"); +} + +@Test +public void testConfigureThrowsExceptionOnAccessTokenValidatorClose() { +OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); +AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() { +@Override +public void close() throws IOException { +throw new IOException("My close had an error!"); +} +@Override +public String retrieve() { +return "dummy"; +} +}; + +Map configs = getSaslConfigs(); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs); +handler.configure(accessTokenRetriever, accessTokenValidator); + +// Basically asserting this doesn't throw an exception :( +handler.close(); +} + +private OAuthBearerLoginCallbackHandler createHandler(AccessTokenRetriever accessTokenRetriever, Map configs) { Review comment: This function is never used? ## File path: clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +public class AccessTokenValidatorFactoryTest extends OAuthBearerTest { + +@Test +public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() { +OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); +AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() { +@Override +public void init() throws IOException { +throw new IOException("My init had an error!"); +} +@Override +
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732339345 ## File path: clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +public class AccessTokenValidatorFactoryTest extends OAuthBearerTest { + +@Test +public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() { +OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); +AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() { +@Override +public void init() throws IOException { +throw new IOException("My init had an error!"); +} +@Override +public String retrieve() { +return "dummy"; +} +}; + +Map configs = getSaslConfigs(); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs); + +assertThrowsWithMessage( +KafkaException.class, () -> handler.configure(accessTokenRetriever, accessTokenValidator), "encountered an error when initializing"); +} + +@Test +public void testConfigureThrowsExceptionOnAccessTokenValidatorClose() { +OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); +AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() { +@Override +public void close() throws IOException { +throw new IOException("My close had an error!"); +} +@Override +public String retrieve() { +return "dummy"; +} +}; + +Map configs = getSaslConfigs(); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs); +handler.configure(accessTokenRetriever, accessTokenValidator); + +// Basically asserting this doesn't throw an exception :( Review comment: dummy question: why we don't want to throw exception when close fails? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11417: KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness
cmccabe opened a new pull request #11417: URL: https://github.com/apache/kafka/pull/11417 Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests which inherit from this class can test both ZK and KRaft mode. Test cases which do this can specify the modes they support by including a ParameterizedTest annotation before each test case, like the following: @ParameterizedTest @valuesource(strings = Array("zk", "kraft")) def testValidCreateTopicsRequests(quorum: String): Unit = { ... } For each value that is specified here (zk, kraft), the test case will be run once in the appropriate mode. So the test shown above is run twice. This allows integration tests to be incrementally converted over to support KRaft mode, rather than rewritten to support it. As you might expect, test cases which do not specify a quorum argument will continue to run only in ZK mode. JUnit5 makes the quorum annotation visible in the TestInfo object which each @beforeeach function in a test can optionally take. Therefore, this PR converts over the setUp function of the quorum base class, plus every derived class, to take a TestInfo argument. The TestInfo object gets "passed up the chain" to the base class, where it determines which quorum type we create (zk or kraft). The general approach taken here is to make as much as possible work with KRaft, but to leave some things as ZK-only when appropriate. For example, a test that explicitly requests an AdminZkClient object will get an exception if it is running in KRaft mode. Similarly, tests which explicitly request KafkaServer rather than KafkaBroker will get an exception when running in KRaft mode. As a proof of concept, this PR converts over MetricsTest to support KRaft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
ableegoldman commented on a change in pull request #11405: URL: https://github.com/apache/kafka/pull/11405#discussion_r732310453 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -2203,7 +2203,7 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() { task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null); assertThrows( -TimeoutException.class, +StreamsException.class, Review comment: Good point -- done (also did for `StandbyTaskTest`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe merged pull request #11410: URL: https://github.com/apache/kafka/pull/11410 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430275#comment-17430275 ] Eugen Dück edited comment on KAFKA-13289 at 10/19/21, 10:47 PM: We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] ---}} {{[INFO] T E S T S}} {{[INFO] ---}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} 0 [0:left/null, 0:left/0:right] {{1 [1:left/1:right]}} {{ {{2 [2:left/2:right] {{ {{3 [3:left/null, 3:left/3:right] {{ {{4 [4:left/null, 4:left/4:right] {{ {{5 [5:left/5:right] {{ {{6 [6:left/null, 6:left/6:right] {{ {{7 [7:left/7:right] {{ {{8 [8:left/8:right] {{ 9 [9:left/9:right]}}{{}} [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test was (Author: eugendueck): We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] ---}} {{[INFO] T E S T S}} {{[INFO] ---}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} 0 [0:left/null, 0:left/0:right] {{ 1 [1:left/1:right]}} {{2 [2:left/2:right]}} {{3 [3:left/null, 3:left/3:right]}} {{4 [4:left/null, 4:left/4:right]}} {{5 [5:left/5:right]}} {{6 [6:left/null, 6:left/6:right]}} {{7 [7:left/7:right]}} {{8 [8:left/8:right]}} {{9 [9:left/9:right] }} [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test{{ }} > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams
[GitHub] [kafka] cmccabe commented on pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on pull request #11410: URL: https://github.com/apache/kafka/pull/11410#issuecomment-947161702 I reran `ConnectorRestartApiIntegrationTest` and didn't see a failure locally. Will commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on pull request #11284: URL: https://github.com/apache/kafka/pull/11284#issuecomment-947122444 Thanks for the feedback, @YiDing-Duke! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732259427 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { +this.verificationKeyResolver = verificationKeyResolver; +this.accessTokenValidator = accessTokenValidator; + +try { +verificationKeyResolver.init(); +} catch (Exception e) { +throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e); +} + +isConfigured = true; +} + +@Override +public void close() { +if (verificationKeyResolver != null) { +try { +verificationKeyResolver.close(); +} catch (Exception e) { +log.error(e.getMessage(), e); +} +} +} + +@Override +public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { +checkConfigured(); + +for (Callback callback : callbacks) { +if (callback instanceof OAuthBearerValidatorCallback) { +handle((OAuthBearerValidatorCallback) callback); +} else if (callback instanceof OAuthBearerExtensionsValidatorCallback) { +OAuthBearerExtensionsValidatorCallback extensionsCallback = (OAuthBearerExtensionsValidatorCallback) callback; + extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsCallback.valid(extensionName)); Review comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732259126 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { +this.verificationKeyResolver = verificationKeyResolver; +this.accessTokenValidator = accessTokenValidator; + +try { +verificationKeyResolver.init(); +} catch (Exception e) { +throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e); +} + +isConfigured = true; Review comment: Unit tests will call `init` directly, which is OK. Changed the flag to `isInitialized` so it fits in better now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732258747 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { Review comment: Sure! I changed it to `init` here too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732258641 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class); + +public static final String CLIENT_ID_CONFIG = "clientId"; +public static final String CLIENT_SECRET_CONFIG = "clientSecret"; +public static final String SCOPE_CONFIG = "scope"; + +public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + +"client ID to uniquely identify the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " + +"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " + +"account and identifies the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + +"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " + +"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + +"include with the login request."; + +private static final String EXTENSION_PREFIX = "extension_"; + +private Map moduleOptions; + +private AccessTokenRetriever accessTokenRetriever; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) +throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + +if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) +throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); + +moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions()); +AccessTokenRetriever accessTokenRetriever =
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732258376 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class); + +public static final String CLIENT_ID_CONFIG = "clientId"; +public static final String CLIENT_SECRET_CONFIG = "clientSecret"; +public static final String SCOPE_CONFIG = "scope"; + +public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + +"client ID to uniquely identify the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " + +"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " + +"account and identifies the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + +"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " + +"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + +"include with the login request."; + +private static final String EXTENSION_PREFIX = "extension_"; + +private Map moduleOptions; + +private AccessTokenRetriever accessTokenRetriever; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) +throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + +if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) +throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); + +moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions()); +AccessTokenRetriever accessTokenRetriever =
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732258128 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retry encapsulates the mechanism to perform a retry and then exponential + * backoff using provided wait times between attempts. + * + * @param Result type + */ + +public class Retry { + +private static final Logger log = LoggerFactory.getLogger(Retry.class); + +private final Time time; + +private final long retryBackoffMs; + +private final long retryBackoffMaxMs; + +public Retry(Time time, long retryBackoffMs, long retryBackoffMaxMs) { +this.time = time; +this.retryBackoffMs = retryBackoffMs; +this.retryBackoffMaxMs = retryBackoffMaxMs; + +if (this.retryBackoffMs < 0) +throw new IllegalArgumentException(String.format("retryBackoffMs value %s must be non-negative", retryBackoffMs)); + +if (this.retryBackoffMaxMs < 0) +throw new IllegalArgumentException(String.format("retryBackoffMaxMs %s value must be non-negative", retryBackoffMaxMs)); + +if (this.retryBackoffMaxMs < this.retryBackoffMs) +throw new IllegalArgumentException(String.format("retryBackoffMaxMs %s is less than retryBackoffMs %s", retryBackoffMaxMs, retryBackoffMs)); +} + +public R execute(Retryable retryable) throws IOException { +int currAttempt = 0; +long end = time.milliseconds() + retryBackoffMaxMs; +IOException error = null; + +while (time.milliseconds() <= end) { +currAttempt++; + +try { +return retryable.call(); +} catch (IOException e) { +if (error == null) +error = e; + +long waitMs = retryBackoffMs * (long) Math.pow(2, currAttempt - 1); +long diff = end - time.milliseconds(); +waitMs = Math.min(waitMs, diff); + +if (waitMs <= 0) Review comment: Added logging for error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732257932 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732251460 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
kirktrue commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732251460 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java ## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} that will + * communicate with an OAuth/OIDC provider directly via HTTP to post client credentials + * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG}) + * to a publicized token endpoint URL + * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}). + * + * @see AccessTokenRetriever + * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG + * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG + * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG + * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + */ + +public class HttpAccessTokenRetriever implements AccessTokenRetriever { + +private static final Logger log = LoggerFactory.getLogger(HttpAccessTokenRetriever.class); + +private static final Set UNRETRYABLE_HTTP_CODES; + +public static final String AUTHORIZATION_HEADER = "Authorization"; + +static { +// This does not have to be an exhaustive list. There are other HTTP codes that +// are defined in different RFCs (e.g. https://datatracker.ietf.org/doc/html/rfc6585) +// that we won't worry about yet. The worst case if a status code is missing from +// this set is that the request will be retried. +UNRETRYABLE_HTTP_CODES = new HashSet<>(); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED); +UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION); +} + +private final String clientId; + +private final String clientSecret; + +private final String scope; + +private final SSLSocketFactory sslSocketFactory; + +private final String tokenEndpointUri; + +private final long loginRetryBackoffMs; + +private final long loginRetryBackoffMaxMs;
[GitHub] [kafka] vvcephei commented on a change in pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
vvcephei commented on a change in pull request #11405: URL: https://github.com/apache/kafka/pull/11405#discussion_r732245277 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -2203,7 +2203,7 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() { task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null); assertThrows( -TimeoutException.class, +StreamsException.class, Review comment: Up to you, but if it's semantically important for this to wrap a TimeoutException, then maybe we should assert the cause is TimeoutException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #11411: MINOR: Clarify acceptable recovery lag config doc
vvcephei commented on pull request #11411: URL: https://github.com/apache/kafka/pull/11411#issuecomment-947102353 Thanks, @ableegoldman ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r73227 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { + +/** + * Request a transaction commit after the next batch of records from {@link SourceTask#poll()} + * is processed. + */ +void commitTransaction(); + +/** + * Request a transaction commit after a source record is processed. The source record will be the + * last record in the committed transaction. + * @param record the record to commit the transaction after. Review comment: Ack, added "may not be null". ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { + +/** + * Request a transaction commit after the next batch of records from {@link SourceTask#poll()} + * is processed. + */ +void commitTransaction(); + +/** + * Request a transaction commit after a source record is processed. The source record will be the + * last record in the committed transaction. + * @param record the record to commit the transaction after. + */ +void commitTransaction(SourceRecord record); + +/** + * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of + * the records in that transaction will be discarded and will not appear in a committed transaction. + * However, offsets for that transaction will still be committed. If the data should be reprocessed, + * the task should not invoke this method and should instead throw an exception. + */ +void abortTransaction(); + +/** + * Requests a transaction abort after a source record is processed. The source record will be the + * last record in the aborted transaction. All of the records in that transaction will be discarded + * and will not appear in a committed transaction. However, offsets for that transaction will still + * be committed. If the data should be reprocessed, the task should not invoke this method and + * should instead throw an exception. + * @param record the record to abort the transaction after. Review comment: Ack, added "may not be null". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r732232701 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ## @@ -163,13 +181,24 @@ public synchronized boolean beginFlush() { } // And submit the data -log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush); +log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), flushed); } -return backingStore.set(offsetsSerialized, (error, result) -> { -boolean isCurrent = handleFinishWrite(flushId, error, result); -if (isCurrent && callback != null) { -callback.onCompletion(error, result); +return primaryBackingStore.set(offsetsSerialized, (primaryError, primaryResult) -> { +boolean isCurrent = handleFinishWrite(flushId, primaryError, primaryResult); +if (isCurrent) { +if (callback != null) { +callback.onCompletion(primaryError, primaryResult); +} +if (secondaryBackingStore != null && primaryError == null) { +secondaryBackingStore.set(offsetsSerialized, (secondaryError, secondaryResult) -> { +if (secondaryError != null) { +log.warn("Failed to write offsets ({}) to secondary backing store", flushed, secondaryError); +} else { +log.debug("Successfully flushed offsets ({}) to secondary backing store", flushed); Review comment: Good catch; I believe there's also a case in the existing case where offset commit messages that are logged in a producer callback are also missing the MDC context. I've addressed both cases. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { +/** + * Review comment: Ack, removed. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { +/** + * + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * Review comment: Ack, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r732231502 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ +public ExactlyOnceSupport exactlyOnceSupport(Map connectorConfig) { +return null; Review comment: > Should we just have another enum for UNKNOWN and make this more explicit than "null"? This was actually [suggested in the discussion thread](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3cCAMdOrUX4CvPsb+yjfTenHyRTtE=2aaw-_-_b2vbd+pvqzy7...@mail.gmail.com%3e): > what do you think about a new "exactlyOnce()" method to the SourceConnector class that can return a new ExactlyOnce enum with options of "SUPPORTED", "UNSUPPORTED", and "UNKNOWN", with a default implementation that returns "UNKNOWN"? And [decided against](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3ccadxunmbsypos0lej8kxw9eapcxc7wbtgxqdqhrpu6qrbjwi...@mail.gmail.com%3e): > The problem with having an explicit UNKNOWN case is we really want connector developers to _not_ use it. That could mean it's deprecated from the start. Alternatively we could omit it from the enum and use null to mean unknown (we'd have to check for a null result anyway), with the contract for the method being that it should return non-null. Of course, this doesn't remove the ambiguous case, but avoids the need to eventually remove UNKNOWN in the future. (What I found especially convincing in the snippet above were the points that 1) we don't want people to return `UNKNOWN` from this method, and 2) no matter what, we're going to have to check for `null` anyways.) > Also, it seems like it would make sense to document that this method should be overridden by Connector developers, but has a default for backward compatibility. Ack, can do. > And it should state more clearly what should be returned for the various options. I've taken a shot at this, not sure how much clearer it can get but if you have thoughts let me know. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { +/** + * + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * + */ +public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + +public enum TransactionBoundary { Review comment: Ack, done. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { +/** + * + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * + */ +public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + +public enum TransactionBoundary { +POLL, +INTERVAL, +CONNECTOR; + +public static final TransactionBoundary DEFAULT = POLL; + +public static List options() { Review comment: I wanted a convenient way to bring everything to lowercase, which is more standard for properties like this (see how [values for the consumer `isolation.level` property are
[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)
C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r732231269 ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ## @@ -38,4 +38,30 @@ * Get the OffsetStorageReader for this SourceTask. */ OffsetStorageReader offsetStorageReader(); + +/** + * Get a {@link TransactionContext} that can be used to define producer transaction boundaries + * when exactly-once support is enabled for the connector. + * + * This method was added in Apache Kafka 3.0. Source tasks that use this method but want to + * maintain backward compatibility so they can also be deployed to older Connect runtimes + * should guard the call to this method with a try-catch block, since calling this method will result in a + * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to + * Connect runtimes older than Kafka 3.0. For example: + * + * TransactionContext transactionContext; + * try { + * transactionContext = context.transactionContext(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * transactionContext = null; + * } + * + * + * @return the transaction context, or null if the user does not want the connector to define + * its own transaction boundaries Review comment: No objections to modifying Javadocs here instead of in a KIP (especially since you made that clear on the mailing list). It's easier to review these sorts of details in a PR IMO anyways. Can make the change to refer to the connector configuration. Agree that "user" is ambiguous. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ +public ExactlyOnceSupport exactlyOnceSupport(Map connectorConfig) { +return null; +} + +/** + * Signals whether the connector can define its own transaction boundaries with the proposed + * configuration. Developers must override this method if they wish to add connector-defined + * transaction boundary support; if they do not, users will be unable to create instances of + * this connector that use connector-defined transaction boundaries. The default implementation + * will return {@code UNSUPPORTED}. Review comment: Can do. I think a reference to the existing `validate` method may help clarify things (especially since this method will be used in almost exactly the same way); LMK what you think. ## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + +/** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. Review comment: Ack, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field
ableegoldman commented on pull request #11405: URL: https://github.com/apache/kafka/pull/11405#issuecomment-947065671 All tests passed, but for two unrelated flaky test failures of: `kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11411: MINOR: Clarify acceptable recovery lag config doc
ableegoldman commented on pull request #11411: URL: https://github.com/apache/kafka/pull/11411#issuecomment-947064999 Thanks for the update. I've had some users hit upon the same misunderstanding that I'm guessing sparked this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11411: MINOR: Clarify acceptable recovery lag config doc
ableegoldman commented on a change in pull request #11411: URL: https://github.com/apache/kafka/pull/11411#discussion_r732203571 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -329,8 +329,10 @@ /** {@code acceptable.recovery.lag} */ public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG = "acceptable.recovery.lag"; -private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up for an active task." + - "Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0."; +private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough." + Review comment: ```suggestion private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough" + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController
dielhennr commented on a change in pull request #10772: URL: https://github.com/apache/kafka/pull/10772#discussion_r732112282 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java ## @@ -26,28 +26,36 @@ import java.util.Objects; public final class QuorumControllerMetrics implements ControllerMetrics { -private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName( -"KafkaController", "ActiveControllerCount"); -private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName( -"ControllerEventManager", "EventQueueTimeMs"); -private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName( -"ControllerEventManager", "EventQueueProcessingTimeMs"); -private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName( -"KafkaController", "GlobalTopicCount"); -private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName( -"KafkaController", "GlobalPartitionCount"); -private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName( -"KafkaController", "OfflinePartitionsCount"); -private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName( -"KafkaController", "PreferredReplicaImbalanceCount"); - +private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName( Review comment: This was a mistake when I merged trunk into my branch. I reverted it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732092804 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { +this.verificationKeyResolver = verificationKeyResolver; +this.accessTokenValidator = accessTokenValidator; + +try { +verificationKeyResolver.init(); +} catch (Exception e) { +throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e); +} + +isConfigured = true; +} + +@Override +public void close() { +if (verificationKeyResolver != null) { +try { +verificationKeyResolver.close(); +} catch (Exception e) { +log.error(e.getMessage(), e); +} +} +} + +@Override +public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { +checkConfigured(); + +for (Callback callback : callbacks) { +if (callback instanceof OAuthBearerValidatorCallback) { +handle((OAuthBearerValidatorCallback) callback); +} else if (callback instanceof OAuthBearerExtensionsValidatorCallback) { +OAuthBearerExtensionsValidatorCallback extensionsCallback = (OAuthBearerExtensionsValidatorCallback) callback; + extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> extensionsCallback.valid(extensionName)); Review comment: For now, the server side extension validation is a no-op, we just make it as validated? The real function will be our ce-kafka server side authentication? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on a change in pull request #11410: URL: https://github.com/apache/kafka/pull/11410#discussion_r732092038 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -346,6 +367,97 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } + def createAdminClient[B <: KafkaBroker]( + brokers: Seq[B], + adminConfig: Properties): Admin = { +val adminClientProperties = if (adminConfig.isEmpty) { + val newConfig = new Properties() + newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers)) + newConfig +} else { + adminConfig +} +Admin.create(adminClientProperties) + } + + def createTopicWithAdmin[B <: KafkaBroker]( Review comment: There are a lot of differences, though: IntegrationTestUtils.createTopic doesn't take a list of brokers, requires you to create an admin client externally, won't work if the topic already exists. So it can't really be used as a drop-in replacement for `TestUtils#createTopic`, which is the intention here. For now we should probably just accept the duplication, I think, although it's not ideal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on a change in pull request #11410: URL: https://github.com/apache/kafka/pull/11410#discussion_r732090271 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -346,6 +367,97 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } + def createAdminClient[B <: KafkaBroker]( + brokers: Seq[B], + adminConfig: Properties): Admin = { +val adminClientProperties = if (adminConfig.isEmpty) { + val newConfig = new Properties() + newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers)) + newConfig +} else { + adminConfig +} +Admin.create(adminClientProperties) + } + + def createTopicWithAdmin[B <: KafkaBroker]( + topic: String, + numPartitions: Int = 1, + replicationFactor: Int = 1, + brokers: Seq[B], + topicConfig: Properties = new Properties, + adminConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { +val adminClient = createAdminClient(brokers, adminConfig) +try { + val configsMap = new java.util.HashMap[String, String]() + topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString)) + try { +adminClient.createTopics(Collections.singletonList(new NewTopic( + topic, numPartitions, replicationFactor.toShort).configs(configsMap))).all().get() + } catch { +case e: ExecutionException => if (e.getCause != null && + e.getCause.isInstanceOf[TopicExistsException] && + topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic, numPartitions, replicationFactor)) { +} else { Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on a change in pull request #11410: URL: https://github.com/apache/kafka/pull/11410#discussion_r732089124 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1640,6 +1800,37 @@ object TestUtils extends Logging { }, s"Timed out waiting for brokerId $brokerId to come online") } + def getReplicaAssignmentForTopics[B <: KafkaBroker]( + topicNames: Seq[String], + brokers: Seq[B], + adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] = { +val adminClient = createAdminClient(brokers, adminConfig) +val results = new mutable.HashMap[TopicPartition, Seq[Int]] +try { + adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach { +case (topicName, future) => + try { +val description = future.get() +description.partitions().forEach { + case partition => +val topicPartition = new TopicPartition(topicName, partition.partition()) +results.put(topicPartition, partition.replicas().asScala.map(_.id)) +} + } catch { +case e: ExecutionException => if (e.getCause != null && + e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) { + // ignore Review comment: I believe this is consistent with the behavior we have in the ZK case (which is not in TestUtils, but in KafkaZkClient or somewhere, as I recall)... I will add a JavaDoc comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732088812 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { +this.verificationKeyResolver = verificationKeyResolver; +this.accessTokenValidator = accessTokenValidator; + +try { +verificationKeyResolver.init(); +} catch (Exception e) { +throw new KafkaException("The OAuth validator configuration encountered an error when initializing the VerificationKeyResolver", e); +} + +isConfigured = true; Review comment: nit: should we move the flag setting to the end of main configure() function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732088424 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class); + +private CloseableVerificationKeyResolver verificationKeyResolver; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +CloseableVerificationKeyResolver verificationKeyResolver = VerificationKeyResolverFactory.create(configs, saslMechanism); +AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs, saslMechanism, verificationKeyResolver); +configure(verificationKeyResolver, accessTokenValidator); +} + +public void configure(CloseableVerificationKeyResolver verificationKeyResolver, AccessTokenValidator accessTokenValidator) { Review comment: ditto: should we use init() function name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on a change in pull request #11410: URL: https://github.com/apache/kafka/pull/11410#discussion_r732087662 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1220,37 +1348,42 @@ object TestUtils extends Logging { } } - - def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { + def verifyTopicDeletion[B <: KafkaBroker]( + zkClient: KafkaZkClient, + topic: String, + numPartitions: Int, + brokers: Seq[B]): Unit = { val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) -// wait until admin path for delete topic is deleted, signaling completion of topic deletion -waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic), - "Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic)) -waitUntilTrue(() => !zkClient.topicExists(topic), - "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic)) +if (zkClient != null) { Review comment: It provides some additional information in the ZK case. For example, it might help diagnose a situation where the ZK controller never creates the znode under delete_topics. We can get rid of it eventually but I was trying to make a minimal change... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()
[ https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430650#comment-17430650 ] Matthias J. Sax commented on KAFKA-13373: - I did not look into it too deeply, but I guess it must be related to our internal wrappers. To unify runtime code, we wrap different user interfaces such that the runtime only works with a reduced surface area and the wrappers take are to translate between the interfaces. My suspicious is that some wrapper is not forwarding a `stores()` call correctly. If that's true, the fix itself should be simple – the tricky part is only to find the right place in the code... > ValueTransformerWithKeySupplier doesn't work with store() > - > > Key: KAFKA-13373 > URL: https://issues.apache.org/jira/browse/KAFKA-13373 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Anatoly Tsyganenko >Priority: Minor > Labels: newbie > > I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like > this: > > {code:java} > public final class CustomSupplier implements > ValueTransformerWithKeySupplier, JsonNode, JsonNode> { > private final String storeName = "my-store"; > public Set> stores() { > final Deserializer jsonDeserializer = new > JsonDeserializer(); > final Serializer jsonSerializer = new JsonSerializer(); > final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, > jsonDeserializer); > final Serde stringSerde = Serdes.String(); > final StoreBuilder> store > = > Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), > stringSerde, jsonSerde).withLoggingDisabled(); > return Collections.singleton(store); > } > @Override > public ValueTransformerWithKey, JsonNode, JsonNode> > get() { > return new ValueTransformerWithKey, JsonNode, > JsonNode>() { > private ProcessorContext context; > private TimestampedKeyValueStore store; > @Override > public void init(final ProcessorContext context) { > this.store = context.getStateStore(storeName); > this.context = context; > } > // > }{code} > > But got next error for line "this.store = context.getStateStore(storeName);" > in init(): > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > KTABLE-TRANSFORMVALUES-08 has no access to StateStore my-store as the > store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA.{code} > > The same code works perfect with Transform or when I adding store to builder. > Looks like something wrong when ConnectedStoreProvider and > ValueTransformerWithKeySupplier used together. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode
hachikuji commented on a change in pull request #11410: URL: https://github.com/apache/kafka/pull/11410#discussion_r732060868 ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -346,6 +367,97 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } + def createAdminClient[B <: KafkaBroker]( + brokers: Seq[B], + adminConfig: Properties): Admin = { +val adminClientProperties = if (adminConfig.isEmpty) { + val newConfig = new Properties() + newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers)) + newConfig +} else { + adminConfig +} +Admin.create(adminClientProperties) + } + + def createTopicWithAdmin[B <: KafkaBroker]( Review comment: There seems to be some duplication with `IntegrationTestUtils.createTopic`. ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -346,6 +367,97 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } + def createAdminClient[B <: KafkaBroker]( + brokers: Seq[B], + adminConfig: Properties): Admin = { +val adminClientProperties = if (adminConfig.isEmpty) { Review comment: Perhaps instead of checking if the config is empty, we can check if the bootstrap servers property is defined? ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1640,6 +1800,37 @@ object TestUtils extends Logging { }, s"Timed out waiting for brokerId $brokerId to come online") } + def getReplicaAssignmentForTopics[B <: KafkaBroker]( + topicNames: Seq[String], + brokers: Seq[B], + adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] = { +val adminClient = createAdminClient(brokers, adminConfig) +val results = new mutable.HashMap[TopicPartition, Seq[Int]] +try { + adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach { +case (topicName, future) => + try { +val description = future.get() +description.partitions().forEach { + case partition => +val topicPartition = new TopicPartition(topicName, partition.partition()) +results.put(topicPartition, partition.replicas().asScala.map(_.id)) +} + } catch { +case e: ExecutionException => if (e.getCause != null && + e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) { + // ignore Review comment: Wondering if it would be better to let this propagate instead of returning an empty result. At least maybe we can mention the behavior in a doc comment. ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1220,37 +1348,42 @@ object TestUtils extends Logging { } } - - def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { + def verifyTopicDeletion[B <: KafkaBroker]( + zkClient: KafkaZkClient, + topic: String, + numPartitions: Int, + brokers: Seq[B]): Unit = { val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) -// wait until admin path for delete topic is deleted, signaling completion of topic deletion -waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic), - "Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic)) -waitUntilTrue(() => !zkClient.topicExists(topic), - "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic)) +if (zkClient != null) { Review comment: I wonder if we need this logic. It seems not necessary for kraft clusters, so why do we need it for zk clusters? ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -346,6 +367,97 @@ object TestUtils extends Logging { config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version) } + def createAdminClient[B <: KafkaBroker]( + brokers: Seq[B], + adminConfig: Properties): Admin = { +val adminClientProperties = if (adminConfig.isEmpty) { + val newConfig = new Properties() + newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers)) + newConfig +} else { + adminConfig +} +Admin.create(adminClientProperties) + } + + def createTopicWithAdmin[B <: KafkaBroker]( + topic: String, + numPartitions: Int = 1, + replicationFactor: Int = 1, + brokers: Seq[B], + topicConfig: Properties = new Properties, + adminConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = { +val adminClient =
[GitHub] [kafka] vamossagar12 edited a comment on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 edited a comment on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369 Thanks Sophie for that. @showuon , I think your concern about persistent stores and custom stores is still valid. The reason why I don't want to add the getObservedStreamTime method in SessionStore or WindowStore is that I feel it's not something which should be added to those interfaces. It's an internal level detail about how to track observed stream time. Having said that, if we want this behaviour to be also available for custom stores(and which is why we chose to add it to MeteredStores), then those custom stores need to implement PersistentStore classes which is not the way it works today, right? One approach could be to think about custom state stores separately and have this merged is this looks fine. That's because I think custom state stores will need more thinking because of the way the State stores are structured or wrapped. Or else I am open to suggestions ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369 Thanks Sophie for that. @showuon , I think your concern about persistent stores and custom stores is still valid. The reason why I don't want to add the getObservedStreamTime method in SessionStore or WindowStore is that I feel it's not something which should be added to those interfaces. It's an internal level detail about how to track observed stream time. Having said that, if we want this behaviour to be also available for custom stores(and which is why we chose to add it to MeteredStores), then those custom stores need to implement PersistentStore classes which is not the way it works today, right? One approach could be to think about custom state stores separately and have this merged is this looks fine. Or else I am open to suggestions ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732068749 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class); + +public static final String CLIENT_ID_CONFIG = "clientId"; +public static final String CLIENT_SECRET_CONFIG = "clientSecret"; +public static final String SCOPE_CONFIG = "scope"; + +public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + +"client ID to uniquely identify the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " + +"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " + +"account and identifies the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + +"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " + +"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + +"include with the login request."; + +private static final String EXTENSION_PREFIX = "extension_"; + +private Map moduleOptions; + +private AccessTokenRetriever accessTokenRetriever; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) +throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + +if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) +throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); + +moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions()); +AccessTokenRetriever accessTokenRetriever =
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r732068187 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.sasl.SaslException; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHandler { + +private static final Logger log = LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class); + +public static final String CLIENT_ID_CONFIG = "clientId"; +public static final String CLIENT_SECRET_CONFIG = "clientSecret"; +public static final String SCOPE_CONFIG = "scope"; + +public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + +"client ID to uniquely identify the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_SECRET_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity provider-issued " + +"client secret serves a similar function as a password to the " + CLIENT_ID_CONFIG + " " + +"account and identifies the service account to use for authentication for " + +"this client. The value must be paired with a corresponding " + CLIENT_ID_CONFIG + " " + +"value and is provided to the OAuth provider using the OAuth " + +"clientcredentials grant type."; + +public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + +"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need to specify an " + +"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + +"include with the login request."; + +private static final String EXTENSION_PREFIX = "extension_"; + +private Map moduleOptions; + +private AccessTokenRetriever accessTokenRetriever; + +private AccessTokenValidator accessTokenValidator; + +private boolean isConfigured = false; + +@Override +public void configure(Map configs, String saslMechanism, List jaasConfigEntries) { +if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) +throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + +if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) +throw new IllegalArgumentException(String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); + +moduleOptions = Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions()); +AccessTokenRetriever accessTokenRetriever =
[jira] [Created] (KAFKA-13385) In the KRPC request header, translate null clientID to empty
Colin McCabe created KAFKA-13385: Summary: In the KRPC request header, translate null clientID to empty Key: KAFKA-13385 URL: https://issues.apache.org/jira/browse/KAFKA-13385 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()
[ https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Xia updated KAFKA-13373: - Labels: newbie (was: ) > ValueTransformerWithKeySupplier doesn't work with store() > - > > Key: KAFKA-13373 > URL: https://issues.apache.org/jira/browse/KAFKA-13373 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Anatoly Tsyganenko >Priority: Minor > Labels: newbie > > I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like > this: > > {code:java} > public final class CustomSupplier implements > ValueTransformerWithKeySupplier, JsonNode, JsonNode> { > private final String storeName = "my-store"; > public Set> stores() { > final Deserializer jsonDeserializer = new > JsonDeserializer(); > final Serializer jsonSerializer = new JsonSerializer(); > final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, > jsonDeserializer); > final Serde stringSerde = Serdes.String(); > final StoreBuilder> store > = > Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), > stringSerde, jsonSerde).withLoggingDisabled(); > return Collections.singleton(store); > } > @Override > public ValueTransformerWithKey, JsonNode, JsonNode> > get() { > return new ValueTransformerWithKey, JsonNode, > JsonNode>() { > private ProcessorContext context; > private TimestampedKeyValueStore store; > @Override > public void init(final ProcessorContext context) { > this.store = context.getStateStore(storeName); > this.context = context; > } > // > }{code} > > But got next error for line "this.store = context.getStateStore(storeName);" > in init(): > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > KTABLE-TRANSFORMVALUES-08 has no access to StateStore my-store as the > store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA.{code} > > The same code works perfect with Transform or when I adding store to builder. > Looks like something wrong when ConnectedStoreProvider and > ValueTransformerWithKeySupplier used together. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController
cmccabe commented on pull request #10772: URL: https://github.com/apache/kafka/pull/10772#issuecomment-946887188 thanks for this, @dielhennr. I left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController
cmccabe commented on a change in pull request #10772: URL: https://github.com/apache/kafka/pull/10772#discussion_r732038559 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java ## @@ -26,28 +26,36 @@ import java.util.Objects; public final class QuorumControllerMetrics implements ControllerMetrics { -private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName( -"KafkaController", "ActiveControllerCount"); -private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName( -"ControllerEventManager", "EventQueueTimeMs"); -private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName( -"ControllerEventManager", "EventQueueProcessingTimeMs"); -private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName( -"KafkaController", "GlobalTopicCount"); -private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName( -"KafkaController", "GlobalPartitionCount"); -private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName( -"KafkaController", "OfflinePartitionsCount"); -private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName( -"KafkaController", "PreferredReplicaImbalanceCount"); - +private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName( Review comment: why are we changing from `getMetricName` to `new MetricName`? Should this be a separate PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController
cmccabe commented on a change in pull request #10772: URL: https://github.com/apache/kafka/pull/10772#discussion_r732036343 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ## @@ -249,6 +256,10 @@ public void replay(RegisterBrokerRecord record) { features.put(feature.name(), new VersionRange( feature.minSupportedVersion(), feature.maxSupportedVersion())); } + +if (record.fenced()) { + controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1); Review comment: We need to update this based on what the previous registration was, if any. If the previous registration was also fenced, we do not want to increase the fenced broker count. It is also possible for the new registration to start as unfenced. Please look at the record definitions. It would be useful to have a helper function that took as an argument the previous registration (or null) and the new registration (or null), and updated the metrics accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController
cmccabe commented on a change in pull request #10772: URL: https://github.com/apache/kafka/pull/10772#discussion_r732036343 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ## @@ -249,6 +256,10 @@ public void replay(RegisterBrokerRecord record) { features.put(feature.name(), new VersionRange( feature.minSupportedVersion(), feature.maxSupportedVersion())); } + +if (record.fenced()) { + controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1); Review comment: We need to update this based on what the previous registration was, if any. It would be useful to have a helper function that took as an argument the previous registration (or null) and the new registration (or null), and updated the metrics accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11410: MINOR: Make TestUtils usable for KRaft mode
cmccabe commented on pull request #11410: URL: https://github.com/apache/kafka/pull/11410#issuecomment-946860744 Test failures seem like flakes (they pass locally) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11416: MINOR: Improve createTopics and incrementalAlterConfigs in KRaft
cmccabe opened a new pull request #11416: URL: https://github.com/apache/kafka/pull/11416 For CreateTopics, fix a bug where if one createTopics in a batch failed, they would all fail with the same error code. Make the error message for TOPIC_ALREADY_EXISTS consistent with the ZK-based code by including the topic name. For IncrementalAlterConfigs, before we allow topic configurations to be set, we should check that they are valid. (This also applies to newly created topics.) IncrementalAlterConfigs should ignore non-null payloads for DELETE operations. Previously we would return an error in these cases. However, this is not compatible with the old ZK-based code, which ignores the payload in these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13374) [Docs] - All reads from the leader of the partition even after KIP-392?
[ https://issues.apache.org/jira/browse/KAFKA-13374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430590#comment-17430590 ] Robin Moffatt commented on KAFKA-13374: --- Looks good to me as a layperson - thanks for picking this up [~showuon]. > [Docs] - All reads from the leader of the partition even after KIP-392? > --- > > Key: KAFKA-13374 > URL: https://issues.apache.org/jira/browse/KAFKA-13374 > Project: Kafka > Issue Type: Bug >Reporter: Robin Moffatt >Assignee: Luke Chen >Priority: Trivial > > On `https://kafka.apache.org/documentation/#design_replicatedlog` it says > > All reads and writes go to the leader of the partition. > However with KIP-392 I didn't think this was the case any more. If so, the > doc should be updated to clarify. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo closed pull request #10507: KAFKA-8410: Migrating stateful operators to new Processor API
jeqo closed pull request #10507: URL: https://github.com/apache/kafka/pull/10507 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted
[ https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander updated KAFKA-13384: -- Description: We found a misbehavior on our Kafka cluster - version: 2.6.2 (Commit:da65af02e5856e34)) _*FailedPartitionsCount*_ metric is not updated if a partition log file was corrupted Steps to reproduce the problem: # corrupt a partition log file # restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed {code:java} 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread){code} But the value of _*FailedPartitionsCount*_ metric will be 0 (see attached screenshot) was: We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: # corrupt a partition log file # restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed {code:java} 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread){code} But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) > FailedPartitionsCount metric is not updated if a partition log file was > corrupted > - > > Key: KAFKA-13384 > URL: https://issues.apache.org/jira/browse/KAFKA-13384 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 > Environment: OS: > NAME="Amazon Linux AMI" > VERSION="2018.03" > ID="amzn" > ID_LIKE="rhel fedora" > VERSION_ID="2018.03" > PRETTY_NAME="Amazon Linux AMI 2018.03" > CPE_NAME="cpe:/o:amazon:linux:2018.03:ga" > HOME_URL="http://aws.amazon.com/amazon-linux-ami/; > Kafka version: > 2.6.2 (Commit:da65af02e5856e34) >Reporter: Alexander >Priority: Major > Attachments: Screenshot 2021-10-19 at 15.28.33.png > > > We found a misbehavior on our Kafka cluster - version: 2.6.2 > (Commit:da65af02e5856e34)) _*FailedPartitionsCount*_ metric is not updated if > a partition log file was corrupted > Steps to reproduce the problem: > # corrupt a partition log file > # restart Kafka process > After that, you will get a correct log which tells that Kafka marked > corrupted partitions as failed > {code:java} > 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher > replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as > failed > (kafka.server.ReplicaFetcherThread){code} > But the value of _*FailedPartitionsCount*_ metric will be 0 (see attached > screenshot) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted
[ https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander updated KAFKA-13384: -- Description: We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: # corrupt a partition log file # restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed {code:java} 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread){code} But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) was: We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: # corrupt a partition log file # restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed {code:java} 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread){code} But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) > FailedPartitionsCount metric is not updated if a partition log file was > corrupted > - > > Key: KAFKA-13384 > URL: https://issues.apache.org/jira/browse/KAFKA-13384 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 > Environment: OS: > NAME="Amazon Linux AMI" > VERSION="2018.03" > ID="amzn" > ID_LIKE="rhel fedora" > VERSION_ID="2018.03" > PRETTY_NAME="Amazon Linux AMI 2018.03" > CPE_NAME="cpe:/o:amazon:linux:2018.03:ga" > HOME_URL="http://aws.amazon.com/amazon-linux-ami/; > Kafka version: > 2.6.2 (Commit:da65af02e5856e34) >Reporter: Alexander >Priority: Major > Attachments: Screenshot 2021-10-19 at 15.28.33.png > > > We found a misbehavior on our Kafka cluster (version: 2.6.2 > (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if > a partition log file was corrupted > Steps to reproduce the problem: > # corrupt a partition log file > # restart Kafka process > After that, you will get a correct log which tells that Kafka marked > corrupted partitions as failed > > {code:java} > 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher > replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as > failed > (kafka.server.ReplicaFetcherThread){code} > > But the value of `FailedPartitionsCount` metric will be 0 (see attached > screenshot) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted
[ https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander updated KAFKA-13384: -- Description: We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: # corrupt a partition log file # restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed {code:java} 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread){code} But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) was: We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: 1. corrupt a partition log file 2. restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed ``` 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread) ``` But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) > FailedPartitionsCount metric is not updated if a partition log file was > corrupted > - > > Key: KAFKA-13384 > URL: https://issues.apache.org/jira/browse/KAFKA-13384 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.2 > Environment: OS: > NAME="Amazon Linux AMI" > VERSION="2018.03" > ID="amzn" > ID_LIKE="rhel fedora" > VERSION_ID="2018.03" > PRETTY_NAME="Amazon Linux AMI 2018.03" > CPE_NAME="cpe:/o:amazon:linux:2018.03:ga" > HOME_URL="http://aws.amazon.com/amazon-linux-ami/; > Kafka version: > 2.6.2 (Commit:da65af02e5856e34) >Reporter: Alexander >Priority: Major > Attachments: Screenshot 2021-10-19 at 15.28.33.png > > > We found a misbehavior on our Kafka cluster (version: 2.6.2 > (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if > a partition log file was corrupted > Steps to reproduce the problem: > # corrupt a partition log file > # restart Kafka process > After that, you will get a correct log which tells that Kafka marked > corrupted partitions as failed > > {code:java} > 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher > replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as > failed > (kafka.server.ReplicaFetcherThread){code} > > > But the value of `FailedPartitionsCount` metric will be 0 (see attached > screenshot) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted
Alexander created KAFKA-13384: - Summary: FailedPartitionsCount metric is not updated if a partition log file was corrupted Key: KAFKA-13384 URL: https://issues.apache.org/jira/browse/KAFKA-13384 Project: Kafka Issue Type: Bug Affects Versions: 2.6.2 Environment: OS: NAME="Amazon Linux AMI" VERSION="2018.03" ID="amzn" ID_LIKE="rhel fedora" VERSION_ID="2018.03" PRETTY_NAME="Amazon Linux AMI 2018.03" CPE_NAME="cpe:/o:amazon:linux:2018.03:ga" HOME_URL="http://aws.amazon.com/amazon-linux-ami/; Kafka version: 2.6.2 (Commit:da65af02e5856e34) Reporter: Alexander Attachments: Screenshot 2021-10-19 at 15.28.33.png We found a misbehavior on our Kafka cluster (version: 2.6.2 (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a partition log file was corrupted Steps to reproduce the problem: 1. corrupt a partition log file 2. restart Kafka process After that, you will get a correct log which tells that Kafka marked corrupted partitions as failed ``` 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed (kafka.server.ReplicaFetcherThread) ``` But the value of `FailedPartitionsCount` metric will be 0 (see attached screenshot) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo opened a new pull request #11415: chore: ignore bin on new modules
jeqo opened a new pull request #11415: URL: https://github.com/apache/kafka/pull/11415 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10543) Convert KTable joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430507#comment-17430507 ] Jorge Esteban Quilcate Otoya commented on KAFKA-10543: -- WIP: [https://github.com/apache/kafka/pull/11412] > Convert KTable joins to new PAPI > > > Key: KAFKA-10543 > URL: https://issues.apache.org/jira/browse/KAFKA-10543 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10540) Convert KStream aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya resolved KAFKA-10540. -- Resolution: Fixed https://github.com/apache/kafka/pull/11315 > Convert KStream aggregations to new PAPI > > > Key: KAFKA-10540 > URL: https://issues.apache.org/jira/browse/KAFKA-10540 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10539) Convert KStreamImpl joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya resolved KAFKA-10539. -- Resolution: Fixed https://github.com/apache/kafka/pull/11356 > Convert KStreamImpl joins to new PAPI > - > > Key: KAFKA-10539 > URL: https://issues.apache.org/jira/browse/KAFKA-10539 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on pull request #11414: MINOR: Renamed a few record definition files with the existing convention.
satishd commented on pull request #11414: URL: https://github.com/apache/kafka/pull/11414#issuecomment-946647417 @junrao This is a minor PR with the existing file name conventions and throwing a proper error message while reading LeaderEpochCache with an unsupported version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd opened a new pull request #11414: MINOR: Renamed a few record definition files with the existing convention.
satishd opened a new pull request #11414: URL: https://github.com/apache/kafka/pull/11414 MINOR: Renamed a few record definition files with the existing convention. - Throwing an error message while reading LeaderEpochCheckpoint file with an unsupported version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13374) [Docs] - All reads from the leader of the partition even after KIP-392?
[ https://issues.apache.org/jira/browse/KAFKA-13374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430448#comment-17430448 ] Luke Chen commented on KAFKA-13374: --- Thank you for reporting and confirming, [~rmoff] [~jlaskowski] , I've submitted PR and ready for review: [https://github.com/apache/kafka/pull/11408] Welcome to provide comments. Thank you. > [Docs] - All reads from the leader of the partition even after KIP-392? > --- > > Key: KAFKA-13374 > URL: https://issues.apache.org/jira/browse/KAFKA-13374 > Project: Kafka > Issue Type: Bug >Reporter: Robin Moffatt >Assignee: Luke Chen >Priority: Trivial > > On `https://kafka.apache.org/documentation/#design_replicatedlog` it says > > All reads and writes go to the leader of the partition. > However with KIP-392 I didn't think this was the case any more. If so, the > doc should be updated to clarify. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13043) Add Admin API for batched offset fetch requests (KIP-709)
[ https://issues.apache.org/jira/browse/KAFKA-13043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13043: Fix Version/s: (was: 3.1.0) 3.2.0 > Add Admin API for batched offset fetch requests (KIP-709) > - > > Key: KAFKA-13043 > URL: https://issues.apache.org/jira/browse/KAFKA-13043 > Project: Kafka > Issue Type: New Feature > Components: admin >Affects Versions: 3.1.0, 3.0.0 >Reporter: Rajini Sivaram >Assignee: Sanjana Kaundinya >Priority: Major > Fix For: 3.2.0 > > > Protocol changes and broker-side changes to process batched > OffsetFetchRequests were added under KAFKA-12234. This ticket is to add Admin > API changes to use this feature. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13043) Add Admin API for batched offset fetch requests (KIP-709)
[ https://issues.apache.org/jira/browse/KAFKA-13043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430444#comment-17430444 ] David Jacot commented on KAFKA-13043: - Moving this feature to 3.2.0 as [~skaundinya] does not have cycles to finish it for 3.1.0. > Add Admin API for batched offset fetch requests (KIP-709) > - > > Key: KAFKA-13043 > URL: https://issues.apache.org/jira/browse/KAFKA-13043 > Project: Kafka > Issue Type: New Feature > Components: admin >Affects Versions: 3.1.0, 3.0.0 >Reporter: Rajini Sivaram >Assignee: Sanjana Kaundinya >Priority: Major > Fix For: 3.1.0 > > > Protocol changes and broker-side changes to process batched > OffsetFetchRequests were added under KAFKA-12234. This ticket is to add Admin > API changes to use this feature. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13370) Offset commit failure percentage metric is not computed correctly (regression)
[ https://issues.apache.org/jira/browse/KAFKA-13370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430429#comment-17430429 ] Luke Chen commented on KAFKA-13370: --- [~20100g], thanks for reporting the issue. PR is ready now: [https://github.com/apache/kafka/pull/11413] Welcome to provide comments. Thanks. > Offset commit failure percentage metric is not computed correctly (regression) > -- > > Key: KAFKA-13370 > URL: https://issues.apache.org/jira/browse/KAFKA-13370 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, metrics >Affects Versions: 2.8.0 > Environment: Confluent Platform Helm Chart (v6.2.0) >Reporter: Vincent Giroux >Assignee: Luke Chen >Priority: Minor > Fix For: 2.8.0 > > > There seems to have been a regression in the way the offset-commit-* metrics > are calculated for *source* Kafka Connect connectors since version 2.8.0. > Before this version, any timeout or interruption while trying to commit > offsets for source connectors (e.g. MM2 MirrorSourceConnector) would get > correctly flagged as an offset commit failure (i.e the > *offset-commit-failure-percentage* metric ** would be non-zero). Since > version 2.8.0, these errors are considered as successes. > After digging through the code, the commit where this bug was introduced > appears to be this one : > [https://github.com/apache/kafka/commit/047ad654da7903f3903760b0e6a6a58648ca7715] > I believe removing the boolean *success* argument in the *recordCommit* > method of the *WorkerTask* class (argument deemed redundant because of the > presence of the Throwable *error* argument) and only considering the presence > of a non-null error to determine if a commit is a success or failure might be > a mistake. This is because in the *commitOffsets* method of the > *WorkerSourceTask* class, there are multiple cases where an exception object > is either not available or is not passed to the *recordCommitFailure* method, > e.g. : > * *TImeout #1* : > [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L519] > > * *Timeout #2* : > [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L584] > > * *Interruption* : > [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L529] > > * *Unserializable offset* : > [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L562] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon commented on pull request #11413: URL: https://github.com/apache/kafka/pull/11413#issuecomment-946513668 @chia7712 , could you please take a look? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon commented on a change in pull request #11413: URL: https://github.com/apache/kafka/pull/11413#discussion_r731649801 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -1580,9 +1652,22 @@ private void assertPollMetrics(int minimumPollCountExpected) { assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d); double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count"); double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max"); -assertEquals(0, activeCount, 0.01d); -if (minimumPollCountExpected > 0) { -assertEquals(RECORDS.size(), activeCountMax, 0.01d); + +if (isWriteCompleted) { +assertEquals(0, activeCount, 0.01d); +if (minimumPollCountExpected > 0) { +assertEquals(RECORDS.size(), activeCountMax, 0.01d); +} +} + +double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage"); +double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage"); + +if (!isCommitSucceeded) { +assertTrue(failurePercentage > 0); +} else { +assertTrue(failurePercentage == 0); +assertTrue(successPercentage > 0); Review comment: Add `offset-commit-failure-percentage` and `"offset-commit-success-percentage` metrics verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon commented on a change in pull request #11413: URL: https://github.com/apache/kafka/pull/11413#discussion_r731648129 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ## @@ -129,6 +129,7 @@ // is used in the right place. private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); +private static final long STOP_TIME_OUT = 1; Review comment: Sometimes the worker task stops need more than 1 sec. Increasing the timeout to 10 secs to make it reliable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon commented on a change in pull request #11413: URL: https://github.com/apache/kafka/pull/11413#discussion_r731647369 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -550,16 +552,16 @@ public boolean commitOffsets() { // Now we can actually flush the offsets to user storage. Future flushFuture = offsetWriter.doFlush((error, result) -> { if (error != null) { +// Very rare case: offsets were unserializable and we finished immediately, unable to store +// any data log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error); +finishFailedFlush(); +recordCommitFailure(time.milliseconds() - started, error); } else { Review comment: `doFlush` will return `null` after calling the callback with `error` attached. Handle the failed flush here, since we can know which error is thrown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon commented on a change in pull request #11413: URL: https://github.com/apache/kafka/pull/11413#discussion_r731645529 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -514,9 +514,11 @@ public boolean commitOffsets() { // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, // we can stop flushing immediately if (isCancelled() || timeoutMs <= 0) { -log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); +log.error("{} Failed to flush, task is cancelled, or timed out while waiting for producer " + +"to flush outstanding {} messages", this, outstandingMessages.size()); Review comment: side fix: the error could be timed out or "cancelled". Add in the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests
showuon opened a new pull request #11413: URL: https://github.com/apache/kafka/pull/11413 In https://github.com/apache/kafka/pull/9642, we removed the unnecessary `success` parameter, and use the `error` as the key to identify if the commit successfully or failed. However, there are some cases we passed `success` with `false`, but without `error` value. I think we should always pass the `error` value when failed. Fix it and add tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430335#comment-17430335 ] Aditya Upadhyaya commented on KAFKA-12559: -- [~ableegoldman] I'd like to pick up this task if no one is currently working on it. Let me know your thoughts. > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Martin Sundeqvist >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430275#comment-17430275 ] Eugen Dück edited comment on KAFKA-13289 at 10/19/21, 6:33 AM: --- We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] ---}} {{[INFO] T E S T S}} {{[INFO] ---}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} 0 [0:left/null, 0:left/0:right] {{ 1 [1:left/1:right]}} {{2 [2:left/2:right]}} {{3 [3:left/null, 3:left/3:right]}} {{4 [4:left/null, 4:left/4:right]}} {{5 [5:left/5:right]}} {{6 [6:left/null, 6:left/6:right]}} {{7 [7:left/7:right]}} {{8 [8:left/8:right]}} {{9 [9:left/9:right] }} [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test{{ }} was (Author: eugendueck): We are running into similar issues (6.0.1-ccs for broker and kafka-streams library, i.e. kafka 2.6.1) * lots of "Skipping record for expired segment." warnings in AbstractRocksDBSegmentedBytesStore * at some point, our topology stops outputting data As we don't have any re-partitioning in our pipeline, I tried to remove the re-keying part from Matthew's code, and as far as I can tell, the problem still persists, so it would look like it is not related to re-partitioning. Btw. the problem shows even when doing just 10 instead of 1000 messages per topic. Find my fork of Matthew's code here: [https://github.com/EugenDueck/ins14809] This is the output of one such test run: {{[INFO] ---}} {{[INFO] T E S T S}} {{[INFO] ---}} {{[INFO] Running ins14809.Ins14809Test}} {{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 2:left, 8:left]}} {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 9:right, 8:right, 6:right]}} {{# Actual results}} {{We want to see every number X below end with an entry that says [X,left/X,right]}} {{but in practice we often see only [X,left/null] meaning the data was not joined.}} {{This seems to coincide with kafka streams writing...}} {{`WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment`}} {{...to its logs, in spite of the fact that the source message timestamps were in order when}} {{kafka streams got them.}} {{0 [0:left/null, 0:left/0:right]}} {{ 1 [1:left/1:right]}} {{ 2 [2:left/2:right]}} {{ 3 [3:left/null, 3:left/3:right]}} {{ 4 [4:left/null, 4:left/4:right]}} {{ 5 [5:left/5:right]}} {{ 6 [6:left/null, 6:left/6:right]}} {{ 7 [7:left/7:right]}} {{ 8 [8:left/8:right]}} {{ 9 [9:left/9:right]}} {{[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 s - in ins14809.Ins14809Test}} > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following
[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
YiDing-Duke commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r731524967 ## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retry encapsulates the mechanism to perform a retry and then exponential + * backoff using provided wait times between attempts. + * + * @param Result type + */ + +public class Retry { + +private static final Logger log = LoggerFactory.getLogger(Retry.class); + +private final Time time; + +private final long retryBackoffMs; + +private final long retryBackoffMaxMs; + +public Retry(Time time, long retryBackoffMs, long retryBackoffMaxMs) { +this.time = time; +this.retryBackoffMs = retryBackoffMs; +this.retryBackoffMaxMs = retryBackoffMaxMs; + +if (this.retryBackoffMs < 0) +throw new IllegalArgumentException(String.format("retryBackoffMs value %s must be non-negative", retryBackoffMs)); + +if (this.retryBackoffMaxMs < 0) +throw new IllegalArgumentException(String.format("retryBackoffMaxMs %s value must be non-negative", retryBackoffMaxMs)); + +if (this.retryBackoffMaxMs < this.retryBackoffMs) +throw new IllegalArgumentException(String.format("retryBackoffMaxMs %s is less than retryBackoffMs %s", retryBackoffMaxMs, retryBackoffMs)); +} + +public R execute(Retryable retryable) throws IOException { +int currAttempt = 0; +long end = time.milliseconds() + retryBackoffMaxMs; +IOException error = null; + +while (time.milliseconds() <= end) { +currAttempt++; + +try { +return retryable.call(); +} catch (IOException e) { +if (error == null) +error = e; + +long waitMs = retryBackoffMs * (long) Math.pow(2, currAttempt - 1); +long diff = end - time.milliseconds(); +waitMs = Math.min(waitMs, diff); + +if (waitMs <= 0) Review comment: When it comes to retry timeout, should we log this error so that we can figure out a non retry-able error pattern to add to the list in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org