[jira] [Updated] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved

2021-10-19 Thread Sergio Duran Vegas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Duran Vegas updated KAFKA-13386:
---
Description: 
The join optimization assumes the serializer is deterministic and invariant 
across upgrades. So in case of changes this opimitzation will drop 
invalid/intermediate records. In other situations we have relied on the same 
property, for example when computing whether an update is a duplicate result or 
not.

 

The problem is that some serializers are sadly not deterministic.

 

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]

 
{code:java}
//If this value doesn't match the current value from the original table, it is 
stale and should be discarded.
 if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
 

A solution for this problem would be that the comparison use foreign-key 
reference itself instead of the whole message hash.

 

The bug fix proposal is to be allow the user to choose between one method of 
comparison or another (whole hash or Fk reference). This would fix the problem 
of dropping valid records on certain cases and allow the user to also choose 
the current optimized way of checking valid records and intermediate results 
dropping.

 

 

 

  was:
The join optimization assumes the serializer is deterministic and invariant 
across upgrades. I can recall other discussions in which we wanted to rely on 
the same property, for example when computing whether an update is a duplicate 
result or not.

 

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]

 
{code:java}
//If this value doesn't match the current value from the original table, it is 
stale and should be discarded.
 if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
 

A solution for this problem would be that the comparison use foreign-key 
reference itself instead of the whole message hash.

 

The bug fix proposal is to be allow the user to choose between one method of 
comparison or another (whole hash or Fk reference). This would fix the problem 
of dropping valid records on certain cases and allow the user to also choose 
the current optimized way of checking valid records and intermediate results 
dropping.

 

 

 


> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> -
>
> Key: KAFKA-13386
> URL: https://issues.apache.org/jira/browse/KAFKA-13386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Sergio Duran Vegas
>Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved

2021-10-19 Thread Sergio Duran Vegas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Duran Vegas updated KAFKA-13386:
---
Summary: Foreign Key Join filtering out valid records after a code change / 
schema evolved  (was: Foreign Key Join filtering valid records after a code 
change / schema evolved)

> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> -
>
> Key: KAFKA-13386
> URL: https://issues.apache.org/jira/browse/KAFKA-13386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Sergio Duran Vegas
>Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. I can recall other discussions in which we wanted to rely on 
> the same property, for example when computing whether an update is a 
> duplicate result or not.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13386) Foreign Key Join filtering valid records after a code change / schema evolved

2021-10-19 Thread Sergio Duran Vegas (Jira)
Sergio Duran Vegas created KAFKA-13386:
--

 Summary: Foreign Key Join filtering valid records after a code 
change / schema evolved
 Key: KAFKA-13386
 URL: https://issues.apache.org/jira/browse/KAFKA-13386
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.2
Reporter: Sergio Duran Vegas


The join optimization assumes the serializer is deterministic and invariant 
across upgrades. I can recall other discussions in which we wanted to rely on 
the same property, for example when computing whether an update is a duplicate 
result or not.

 

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]

 
{code:java}
//If this value doesn't match the current value from the original table, it is 
stale and should be discarded.
 if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
 

A solution for this problem would be that the comparison use foreign-key 
reference itself instead of the whole message hash.

 

The bug fix proposal is to be allow the user to choose between one method of 
comparison or another (whole hash or Fk reference). This would fix the problem 
of dropping valid records on certain cases and allow the user to also choose 
the current optimized way of checking valid records and intermediate results 
dropping.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 edited a comment on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-19 Thread GitBox


vamossagar12 edited a comment on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369


   Thanks Sophie for that.
   @showuon , I think your concern about persistent stores and custom stores is 
still valid. The reason why I don't want to add the getObservedStreamTime 
method in SessionStore or WindowStore is that I feel it's not something which 
should be added to those interfaces. It's an internal level detail about how to 
track observed stream time.
   
   Having said that, if we want this behaviour to be also available for custom 
stores(and which is why we chose to add it to MeteredStores), then those custom 
stores need to implement PersistentStore classes which is not the way it works 
today, right? 
   
   One approach could be to think about custom state stores separately and have 
this merged is this looks fine. That's because I think custom state stores will 
need more thinking because of the way the State stores are structured or 
wrapped. And also, today the Persistent StateStores provided by Kafka Streams 
don't enforce retention time which IMO is a bigger problem than Custom State 
Stores. That's my personal opinion and  I am open for suggestions. Let me know 
if that makes sense.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732341161



##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+public class AccessTokenValidatorFactoryTest extends OAuthBearerTest {
+
+@Test
+public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() {
+OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() 
{
+@Override
+public void init() throws IOException {
+throw new IOException("My init had an error!");
+}
+@Override
+public String retrieve() {
+return "dummy";
+}
+};
+
+Map configs = getSaslConfigs();
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs);
+
+assertThrowsWithMessage(
+KafkaException.class, () -> 
handler.configure(accessTokenRetriever, accessTokenValidator), "encountered an 
error when initializing");
+}
+
+@Test
+public void testConfigureThrowsExceptionOnAccessTokenValidatorClose() {
+OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() 
{
+@Override
+public void close() throws IOException {
+throw new IOException("My close had an error!");
+}
+@Override
+public String retrieve() {
+return "dummy";
+}
+};
+
+Map configs = getSaslConfigs();
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs);
+handler.configure(accessTokenRetriever, accessTokenValidator);
+
+// Basically asserting this doesn't throw an exception :(
+handler.close();
+}
+
+private OAuthBearerLoginCallbackHandler createHandler(AccessTokenRetriever 
accessTokenRetriever, Map configs) {

Review comment:
   This function is never used?

##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+public class AccessTokenValidatorFactoryTest extends OAuthBearerTest {
+
+@Test
+public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() {
+OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() 
{
+@Override
+public void init() throws IOException {
+throw new IOException("My init had an error!");
+}
+@Override
+  

[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732339345



##
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/AccessTokenValidatorFactoryTest.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+public class AccessTokenValidatorFactoryTest extends OAuthBearerTest {
+
+@Test
+public void testConfigureThrowsExceptionOnAccessTokenValidatorInit() {
+OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() 
{
+@Override
+public void init() throws IOException {
+throw new IOException("My init had an error!");
+}
+@Override
+public String retrieve() {
+return "dummy";
+}
+};
+
+Map configs = getSaslConfigs();
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs);
+
+assertThrowsWithMessage(
+KafkaException.class, () -> 
handler.configure(accessTokenRetriever, accessTokenValidator), "encountered an 
error when initializing");
+}
+
+@Test
+public void testConfigureThrowsExceptionOnAccessTokenValidatorClose() {
+OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+AccessTokenRetriever accessTokenRetriever = new AccessTokenRetriever() 
{
+@Override
+public void close() throws IOException {
+throw new IOException("My close had an error!");
+}
+@Override
+public String retrieve() {
+return "dummy";
+}
+};
+
+Map configs = getSaslConfigs();
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs);
+handler.configure(accessTokenRetriever, accessTokenValidator);
+
+// Basically asserting this doesn't throw an exception :(

Review comment:
   dummy question: why we don't want to throw exception when close fails?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11417: KAFKA-13340: Change ZooKeeperTestHarness to QuorumTestHarness

2021-10-19 Thread GitBox


cmccabe opened a new pull request #11417:
URL: https://github.com/apache/kafka/pull/11417


   Change ZooKeeperTestHarness to QuorumTestHarness so that integration tests 
which inherit from this
   class can test both ZK and KRaft mode. Test cases which do this can specify 
the modes they support
   by including a ParameterizedTest annotation before each test case, like the 
following:
   
   @ParameterizedTest
   @valuesource(strings = Array("zk", "kraft"))
   def testValidCreateTopicsRequests(quorum: String): Unit = { ... }
   
   For each value that is specified here (zk, kraft), the test case will be run 
once in the appropriate
   mode. So the test shown above is run twice. This allows integration tests to 
be incrementally
   converted over to support KRaft mode, rather than rewritten to support it. 
As you might expect, test
   cases which do not specify a quorum argument will continue to run only in ZK 
mode.
   
   JUnit5 makes the quorum annotation visible in the TestInfo object which each 
@beforeeach function in
   a test can optionally take. Therefore, this PR converts over the setUp 
function of the quorum base
   class, plus every derived class, to take a TestInfo argument. The TestInfo 
object gets "passed up
   the chain" to the base class, where it determines which quorum type we 
create (zk or kraft).
   
   The general approach taken here is to make as much as possible work with 
KRaft, but to leave some
   things as ZK-only when appropriate. For example, a test that explicitly 
requests an AdminZkClient
   object will get an exception if it is running in KRaft mode. Similarly, 
tests which explicitly
   request KafkaServer rather than KafkaBroker will get an exception when 
running in KRaft mode.
   
   As a proof of concept, this PR converts over MetricsTest to support KRaft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field

2021-10-19 Thread GitBox


ableegoldman commented on a change in pull request #11405:
URL: https://github.com/apache/kafka/pull/11405#discussion_r732310453



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -2203,7 +2203,7 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
 task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), 
null);
 
 assertThrows(
-TimeoutException.class,
+StreamsException.class,

Review comment:
   Good point -- done (also did for `StandbyTaskTest`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe merged pull request #11410:
URL: https://github.com/apache/kafka/pull/11410


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2021-10-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430275#comment-17430275
 ] 

Eugen Dück edited comment on KAFKA-13289 at 10/19/21, 10:47 PM:


We are running into similar issues (6.0.1-ccs for broker and kafka-streams 
library, i.e. kafka 2.6.1)
 * lots of "Skipping record for expired segment." warnings in 
AbstractRocksDBSegmentedBytesStore
 * at some point, our topology stops outputting data

As we don't have any re-partitioning in our pipeline, I tried to remove the 
re-keying part from Matthew's code, and as far as I can tell, the problem still 
persists, so it would look like it is not related to re-partitioning. Btw. the 
problem shows even when doing just 10 instead of 1000 messages per topic. Find 
my fork of Matthew's code here: 
 [https://github.com/EugenDueck/ins14809]

This is the output of one such test run:

{{[INFO] ---}}
 {{[INFO] T E S T S}}
 {{[INFO] ---}}
 {{[INFO] Running ins14809.Ins14809Test}}

{{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 
2:left, 8:left]}}
 {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 
9:right, 8:right, 6:right]}}

{{# Actual results}}

{{We want to see every number X below end with an entry that says 
[X,left/X,right]}}
 {{but in practice we often see only [X,left/null] meaning the data was not 
joined.}}
 {{This seems to coincide with kafka streams writing...}}
 {{`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment`}}
 {{...to its logs, in spite of the fact that the source message timestamps were 
in order when}}
 {{kafka streams got them.}}

0 [0:left/null, 0:left/0:right]
{{1 [1:left/1:right]}}
{{ {{2 [2:left/2:right]
{{ {{3 [3:left/null, 3:left/3:right]
{{ {{4 [4:left/null, 4:left/4:right]
{{ {{5 [5:left/5:right]
{{ {{6 [6:left/null, 6:left/6:right]
{{ {{7 [7:left/7:right]
{{ {{8 [8:left/8:right]
{{ 9 [9:left/9:right]}}{{}}

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 
43.267 s - in ins14809.Ins14809Test


was (Author: eugendueck):
We are running into similar issues (6.0.1-ccs for broker and kafka-streams 
library, i.e. kafka 2.6.1)
 * lots of "Skipping record for expired segment." warnings in 
AbstractRocksDBSegmentedBytesStore
 * at some point, our topology stops outputting data

As we don't have any re-partitioning in our pipeline, I tried to remove the 
re-keying part from Matthew's code, and as far as I can tell, the problem still 
persists, so it would look like it is not related to re-partitioning. Btw. the 
problem shows even when doing just 10 instead of 1000 messages per topic. Find 
my fork of Matthew's code here: 
 [https://github.com/EugenDueck/ins14809]

This is the output of one such test run:

{{[INFO] ---}}
 {{[INFO] T E S T S}}
 {{[INFO] ---}}
 {{[INFO] Running ins14809.Ins14809Test}}

{{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 
2:left, 8:left]}}
 {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 
9:right, 8:right, 6:right]}}

{{# Actual results}}

{{We want to see every number X below end with an entry that says 
[X,left/X,right]}}
 {{but in practice we often see only [X,left/null] meaning the data was not 
joined.}}
 {{This seems to coincide with kafka streams writing...}}
 {{`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment`}}
 {{...to its logs, in spite of the fact that the source message timestamps were 
in order when}}
 {{kafka streams got them.}}

0 [0:left/null, 0:left/0:right]
{{ 1 [1:left/1:right]}}
{{2 [2:left/2:right]}}
{{3 [3:left/null, 3:left/3:right]}}
{{4 [4:left/null, 4:left/4:right]}}
{{5 [5:left/5:right]}}
{{6 [6:left/null, 6:left/6:right]}}
{{7 [7:left/7:right]}}
{{8 [8:left/8:right]}}
{{9 [9:left/9:right] }}

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 
43.267 s - in ins14809.Ins14809Test{{ }}

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams 

[GitHub] [kafka] cmccabe commented on pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#issuecomment-947161702


   I reran `ConnectorRestartApiIntegrationTest` and didn't see a failure 
locally. Will commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#issuecomment-947122444


   Thanks for the feedback, @YiDing-Duke!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732259427



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+this.verificationKeyResolver = verificationKeyResolver;
+this.accessTokenValidator = accessTokenValidator;
+
+try {
+verificationKeyResolver.init();
+} catch (Exception e) {
+throw new KafkaException("The OAuth validator configuration 
encountered an error when initializing the VerificationKeyResolver", e);
+}
+
+isConfigured = true;
+}
+
+@Override
+public void close() {
+if (verificationKeyResolver != null) {
+try {
+verificationKeyResolver.close();
+} catch (Exception e) {
+log.error(e.getMessage(), e);
+}
+}
+}
+
+@Override
+public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+checkConfigured();
+
+for (Callback callback : callbacks) {
+if (callback instanceof OAuthBearerValidatorCallback) {
+handle((OAuthBearerValidatorCallback) callback);
+} else if (callback instanceof 
OAuthBearerExtensionsValidatorCallback) {
+OAuthBearerExtensionsValidatorCallback extensionsCallback = 
(OAuthBearerExtensionsValidatorCallback) callback;
+
extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> 
extensionsCallback.valid(extensionName));

Review comment:
   Correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732259126



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+this.verificationKeyResolver = verificationKeyResolver;
+this.accessTokenValidator = accessTokenValidator;
+
+try {
+verificationKeyResolver.init();
+} catch (Exception e) {
+throw new KafkaException("The OAuth validator configuration 
encountered an error when initializing the VerificationKeyResolver", e);
+}
+
+isConfigured = true;

Review comment:
   Unit tests will call `init` directly, which is OK. Changed the flag to 
`isInitialized` so it fits in better now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732258747



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {

Review comment:
   Sure! I changed it to `init` here too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732258641



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+public static final String CLIENT_ID_CONFIG = "clientId";
+public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+public static final String SCOPE_CONFIG = "scope";
+
+public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client ID to uniquely identify the service account to use for 
authentication for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_SECRET_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client secret serves a similar function as a password to the " + 
CLIENT_ID_CONFIG + " " +
+"account and identifies the service account to use for authentication 
for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_ID_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login 
request to the " +
+"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need 
to specify an " +
+"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide 
the value to " +
+"include with the login request.";
+
+private static final String EXTENSION_PREFIX = "extension_";
+
+private Map moduleOptions;
+
+private AccessTokenRetriever accessTokenRetriever;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+if 
(!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+throw new IllegalArgumentException(String.format("Unexpected SASL 
mechanism: %s", saslMechanism));
+
+if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || 
jaasConfigEntries.get(0) == null)
+throw new IllegalArgumentException(String.format("Must supply 
exactly 1 non-null JAAS mechanism configuration (size was %d)", 
jaasConfigEntries.size()));
+
+moduleOptions = 
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+AccessTokenRetriever accessTokenRetriever = 

[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732258376



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+public static final String CLIENT_ID_CONFIG = "clientId";
+public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+public static final String SCOPE_CONFIG = "scope";
+
+public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client ID to uniquely identify the service account to use for 
authentication for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_SECRET_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client secret serves a similar function as a password to the " + 
CLIENT_ID_CONFIG + " " +
+"account and identifies the service account to use for authentication 
for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_ID_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login 
request to the " +
+"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need 
to specify an " +
+"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide 
the value to " +
+"include with the login request.";
+
+private static final String EXTENSION_PREFIX = "extension_";
+
+private Map moduleOptions;
+
+private AccessTokenRetriever accessTokenRetriever;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+if 
(!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+throw new IllegalArgumentException(String.format("Unexpected SASL 
mechanism: %s", saslMechanism));
+
+if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || 
jaasConfigEntries.get(0) == null)
+throw new IllegalArgumentException(String.format("Must supply 
exactly 1 non-null JAAS mechanism configuration (size was %d)", 
jaasConfigEntries.size()));
+
+moduleOptions = 
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+AccessTokenRetriever accessTokenRetriever = 

[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732258128



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry encapsulates the mechanism to perform a retry and then exponential
+ * backoff using provided wait times between attempts.
+ *
+ * @param  Result type
+ */
+
+public class Retry {
+
+private static final Logger log = LoggerFactory.getLogger(Retry.class);
+
+private final Time time;
+
+private final long retryBackoffMs;
+
+private final long retryBackoffMaxMs;
+
+public Retry(Time time, long retryBackoffMs, long retryBackoffMaxMs) {
+this.time = time;
+this.retryBackoffMs = retryBackoffMs;
+this.retryBackoffMaxMs = retryBackoffMaxMs;
+
+if (this.retryBackoffMs < 0)
+throw new IllegalArgumentException(String.format("retryBackoffMs 
value %s must be non-negative", retryBackoffMs));
+
+if (this.retryBackoffMaxMs < 0)
+throw new 
IllegalArgumentException(String.format("retryBackoffMaxMs %s value must be 
non-negative", retryBackoffMaxMs));
+
+if (this.retryBackoffMaxMs < this.retryBackoffMs)
+throw new 
IllegalArgumentException(String.format("retryBackoffMaxMs %s is less than 
retryBackoffMs %s", retryBackoffMaxMs, retryBackoffMs));
+}
+
+public R execute(Retryable retryable) throws IOException {
+int currAttempt = 0;
+long end = time.milliseconds() + retryBackoffMaxMs;
+IOException error = null;
+
+while (time.milliseconds() <= end) {
+currAttempt++;
+
+try {
+return retryable.call();
+} catch (IOException e) {
+if (error == null)
+error = e;
+
+long waitMs = retryBackoffMs * (long) Math.pow(2, currAttempt 
- 1);
+long diff = end - time.milliseconds();
+waitMs = Math.min(waitMs, diff);
+
+if (waitMs <= 0)

Review comment:
   Added logging for error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732257932



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} 
that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client 
credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link 
OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+private static final Logger log = 
LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+private static final Set UNRETRYABLE_HTTP_CODES;
+
+public static final String AUTHORIZATION_HEADER = "Authorization";
+
+static {
+// This does not have to be an exhaustive list. There are other HTTP 
codes that
+// are defined in different RFCs (e.g. 
https://datatracker.ietf.org/doc/html/rfc6585)
+// that we won't worry about yet. The worst case if a status code is 
missing from
+// this set is that the request will be retried.
+UNRETRYABLE_HTTP_CODES = new HashSet<>();
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+}
+
+private final String clientId;
+
+private final String clientSecret;
+
+private final String scope;
+
+private final SSLSocketFactory sslSocketFactory;
+
+private final String tokenEndpointUri;
+
+private final long loginRetryBackoffMs;
+
+private final long loginRetryBackoffMaxMs;

[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732251460



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} 
that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client 
credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link 
OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+private static final Logger log = 
LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+private static final Set UNRETRYABLE_HTTP_CODES;
+
+public static final String AUTHORIZATION_HEADER = "Authorization";
+
+static {
+// This does not have to be an exhaustive list. There are other HTTP 
codes that
+// are defined in different RFCs (e.g. 
https://datatracker.ietf.org/doc/html/rfc6585)
+// that we won't worry about yet. The worst case if a status code is 
missing from
+// this set is that the request will be retried.
+UNRETRYABLE_HTTP_CODES = new HashSet<>();
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+}
+
+private final String clientId;
+
+private final String clientSecret;
+
+private final String scope;
+
+private final SSLSocketFactory sslSocketFactory;
+
+private final String tokenEndpointUri;
+
+private final long loginRetryBackoffMs;
+
+private final long loginRetryBackoffMaxMs;

[GitHub] [kafka] kirktrue commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


kirktrue commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732251460



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/HttpAccessTokenRetriever.java
##
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HttpAccessTokenRetriever is an {@link AccessTokenRetriever} 
that will
+ * communicate with an OAuth/OIDC provider directly via HTTP to post client 
credentials
+ * ({@link OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG}/{@link 
OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG})
+ * to a publicized token endpoint URL
+ * ({@link SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI}).
+ *
+ * @see AccessTokenRetriever
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_ID_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#CLIENT_SECRET_CONFIG
+ * @see OAuthBearerLoginCallbackHandler#SCOPE_CONFIG
+ * @see SaslConfigs#SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI
+ */
+
+public class HttpAccessTokenRetriever implements AccessTokenRetriever {
+
+private static final Logger log = 
LoggerFactory.getLogger(HttpAccessTokenRetriever.class);
+
+private static final Set UNRETRYABLE_HTTP_CODES;
+
+public static final String AUTHORIZATION_HEADER = "Authorization";
+
+static {
+// This does not have to be an exhaustive list. There are other HTTP 
codes that
+// are defined in different RFCs (e.g. 
https://datatracker.ietf.org/doc/html/rfc6585)
+// that we won't worry about yet. The worst case if a status code is 
missing from
+// this set is that the request will be retried.
+UNRETRYABLE_HTTP_CODES = new HashSet<>();
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_REQUEST);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNAUTHORIZED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PAYMENT_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_FORBIDDEN);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_FOUND);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_BAD_METHOD);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_ACCEPTABLE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PROXY_AUTH);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_CONFLICT);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_GONE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_LENGTH_REQUIRED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_PRECON_FAILED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_ENTITY_TOO_LARGE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_REQ_TOO_LONG);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_UNSUPPORTED_TYPE);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+UNRETRYABLE_HTTP_CODES.add(HttpURLConnection.HTTP_VERSION);
+}
+
+private final String clientId;
+
+private final String clientSecret;
+
+private final String scope;
+
+private final SSLSocketFactory sslSocketFactory;
+
+private final String tokenEndpointUri;
+
+private final long loginRetryBackoffMs;
+
+private final long loginRetryBackoffMaxMs;

[GitHub] [kafka] vvcephei commented on a change in pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field

2021-10-19 Thread GitBox


vvcephei commented on a change in pull request #11405:
URL: https://github.com/apache/kafka/pull/11405#discussion_r732245277



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -2203,7 +2203,7 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() {
 task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), 
null);
 
 assertThrows(
-TimeoutException.class,
+StreamsException.class,

Review comment:
   Up to you, but if it's semantically important for this to wrap a 
TimeoutException, then maybe we should assert the cause is TimeoutException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11411: MINOR: Clarify acceptable recovery lag config doc

2021-10-19 Thread GitBox


vvcephei commented on pull request #11411:
URL: https://github.com/apache/kafka/pull/11411#issuecomment-947102353


   Thanks, @ableegoldman !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-19 Thread GitBox


C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r73227



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+/**
+ * Request a transaction commit after the next batch of records from 
{@link SourceTask#poll()}
+ * is processed.
+ */
+void commitTransaction();
+
+/**
+ * Request a transaction commit after a source record is processed. The 
source record will be the
+ * last record in the committed transaction.
+ * @param record the record to commit the transaction after.

Review comment:
   Ack, added "may not be null".

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+/**
+ * Request a transaction commit after the next batch of records from 
{@link SourceTask#poll()}
+ * is processed.
+ */
+void commitTransaction();
+
+/**
+ * Request a transaction commit after a source record is processed. The 
source record will be the
+ * last record in the committed transaction.
+ * @param record the record to commit the transaction after.
+ */
+void commitTransaction(SourceRecord record);
+
+/**
+ * Requests a transaction abort the next batch of records from {@link 
SourceTask#poll()}. All of
+ * the records in that transaction will be discarded and will not appear 
in a committed transaction.
+ * However, offsets for that transaction will still be committed. If the 
data should be reprocessed,
+ * the task should not invoke this method and should instead throw an 
exception.
+ */
+void abortTransaction();
+
+/**
+ * Requests a transaction abort after a source record is processed. The 
source record will be the
+ * last record in the aborted transaction. All of the records in that 
transaction will be discarded
+ * and will not appear in a committed transaction. However, offsets for 
that transaction will still
+ * be committed. If the data should be reprocessed, the task should not 
invoke this method and
+ * should instead throw an exception.
+ * @param record the record to abort the transaction after.

Review comment:
   Ack, added "may not be null".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-19 Thread GitBox


C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r732232701



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##
@@ -163,13 +181,24 @@ public synchronized boolean beginFlush() {
 }
 
 // And submit the data
-log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), toFlush);
+log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), flushed);
 }
 
-return backingStore.set(offsetsSerialized, (error, result) -> {
-boolean isCurrent = handleFinishWrite(flushId, error, result);
-if (isCurrent && callback != null) {
-callback.onCompletion(error, result);
+return primaryBackingStore.set(offsetsSerialized, (primaryError, 
primaryResult) -> {
+boolean isCurrent = handleFinishWrite(flushId, primaryError, 
primaryResult);
+if (isCurrent) {
+if (callback != null) {
+callback.onCompletion(primaryError, primaryResult);
+}
+if (secondaryBackingStore != null && primaryError == null) {
+secondaryBackingStore.set(offsetsSerialized, 
(secondaryError, secondaryResult) -> {
+if (secondaryError != null) {
+log.warn("Failed to write offsets ({}) to 
secondary backing store", flushed, secondaryError);
+} else {
+log.debug("Successfully flushed offsets ({}) to 
secondary backing store", flushed);

Review comment:
   Good catch; I believe there's also a case in the existing case where 
offset commit messages that are logged in a producer callback are also missing 
the MDC context. I've addressed both cases.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+/**
+ * 

Review comment:
   Ack, removed.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+/**
+ * 
+ * The configuration key that determines how source tasks will define 
transaction boundaries
+ * when exactly-once support is enabled.
+ * 

Review comment:
   Ack, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-19 Thread GitBox


C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r732231502



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,31 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ * The default implementation will return {@code null}.
+ * @param connectorConfig the configuration that will be used for the 
connector.
+ * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+ * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+ * connector cannot.
+ */
+public ExactlyOnceSupport exactlyOnceSupport(Map 
connectorConfig) {
+return null;

Review comment:
   > Should we just have another enum for UNKNOWN and make this more 
explicit than "null"?
   
   This was actually [suggested in the discussion 
thread](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3cCAMdOrUX4CvPsb+yjfTenHyRTtE=2aaw-_-_b2vbd+pvqzy7...@mail.gmail.com%3e):
   
   > what do you think about a new "exactlyOnce()" method to the 
SourceConnector class that can return a new ExactlyOnce enum with options of 
"SUPPORTED", "UNSUPPORTED", and "UNKNOWN", with a default implementation that 
returns "UNKNOWN"?
   
   And [decided 
against](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3ccadxunmbsypos0lej8kxw9eapcxc7wbtgxqdqhrpu6qrbjwi...@mail.gmail.com%3e):
   
   > The problem with having an explicit UNKNOWN case is we really want 
connector developers to _not_ use it. That could mean it's deprecated from the 
start. Alternatively we could omit it from the enum and use null to mean 
unknown (we'd have to check for a null result anyway), with the contract for 
the method being that it should return non-null. Of course, this doesn't remove 
the ambiguous case, but avoids the need to eventually remove UNKNOWN in the 
future.
   
   (What I found especially convincing in the snippet above were the points 
that 1) we don't want people to return `UNKNOWN` from this method, and 2) no 
matter what, we're going to have to check for `null` anyways.)
   
   
   > Also, it seems like it would make sense to document that this method 
should be overridden by Connector developers, but has a default for backward 
compatibility.
   
   Ack, can do.
   
   > And it should state more clearly what should be returned for the various 
options.
   
   I've taken a shot at this, not sure how much clearer it can get but if you 
have thoughts let me know.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+/**
+ * 
+ * The configuration key that determines how source tasks will define 
transaction boundaries
+ * when exactly-once support is enabled.
+ * 
+ */
+public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+public enum TransactionBoundary {

Review comment:
   Ack, done.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+/**
+ * 
+ * The configuration key that determines how source tasks will define 
transaction boundaries
+ * when exactly-once support is enabled.
+ * 
+ */
+public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+public enum TransactionBoundary {
+POLL,
+INTERVAL,
+CONNECTOR;
+
+public static final TransactionBoundary DEFAULT = POLL;
+
+public static List options() {

Review comment:
   I wanted a convenient way to bring everything to lowercase, which is 
more standard for properties like this (see how [values for the consumer 
`isolation.level` property are 

[GitHub] [kafka] C0urante commented on a change in pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-19 Thread GitBox


C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r732231269



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
##
@@ -38,4 +38,30 @@
  * Get the OffsetStorageReader for this SourceTask.
  */
 OffsetStorageReader offsetStorageReader();
+
+/**
+ * Get a {@link TransactionContext} that can be used to define producer 
transaction boundaries
+ * when exactly-once support is enabled for the connector.
+ *
+ * This method was added in Apache Kafka 3.0. Source tasks that use 
this method but want to
+ * maintain backward compatibility so they can also be deployed to older 
Connect runtimes
+ * should guard the call to this method with a try-catch block, since 
calling this method will result in a
+ * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the 
source connector is deployed to
+ * Connect runtimes older than Kafka 3.0. For example:
+ * 
+ * TransactionContext transactionContext;
+ * try {
+ * transactionContext = context.transactionContext();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * transactionContext = null;
+ * }
+ * 
+ *
+ * @return the transaction context, or null if the user does not want the 
connector to define
+ * its own transaction boundaries

Review comment:
   No objections to modifying Javadocs here instead of in a KIP (especially 
since you made that clear on the mailing list). It's easier to review these 
sorts of details in a PR IMO anyways.
   
   Can make the change to refer to the connector configuration. Agree that 
"user" is ambiguous.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,31 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ * The default implementation will return {@code null}.
+ * @param connectorConfig the configuration that will be used for the 
connector.
+ * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+ * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+ * connector cannot.
+ */
+public ExactlyOnceSupport exactlyOnceSupport(Map 
connectorConfig) {
+return null;
+}
+
+/**
+ * Signals whether the connector can define its own transaction boundaries 
with the proposed
+ * configuration. Developers must override this method if they wish to add 
connector-defined
+ * transaction boundary support; if they do not, users will be unable to 
create instances of
+ * this connector that use connector-defined transaction boundaries. The 
default implementation
+ * will return {@code UNSUPPORTED}.

Review comment:
   Can do. I think a reference to the existing `validate` method may help 
clarify things (especially since this method will be used in almost exactly the 
same way); LMK what you think.

##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##
@@ -28,4 +30,31 @@
 protected SourceConnectorContext context() {
 return (SourceConnectorContext) context;
 }
+
+/**
+ * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+ * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+ * The default implementation will return {@code null}.

Review comment:
   Ack, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11405: KAFKA-12648: Wrap all exceptions thrown to handler as StreamsException & add TaskId field

2021-10-19 Thread GitBox


ableegoldman commented on pull request #11405:
URL: https://github.com/apache/kafka/pull/11405#issuecomment-947065671


   All tests passed, but for two unrelated flaky test failures of:
`kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testTopicPartition, Security=PLAINTEXT`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11411: MINOR: Clarify acceptable recovery lag config doc

2021-10-19 Thread GitBox


ableegoldman commented on pull request #11411:
URL: https://github.com/apache/kafka/pull/11411#issuecomment-947064999


   Thanks for the update. I've had some users hit upon the same 
misunderstanding that I'm guessing sparked this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11411: MINOR: Clarify acceptable recovery lag config doc

2021-10-19 Thread GitBox


ableegoldman commented on a change in pull request #11411:
URL: https://github.com/apache/kafka/pull/11411#discussion_r732203571



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -329,8 +329,10 @@
 
 /** {@code acceptable.recovery.lag} */
 public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG = 
"acceptable.recovery.lag";
-private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum 
acceptable lag (number of offsets to catch up) for a client to be considered 
caught-up for an active task." +
-  "Should 
correspond to a recovery time of well under a minute for a given workload. Must 
be at least 0.";
+private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum 
acceptable lag (number of offsets to catch up) for a client to be considered 
caught-up enough." +

Review comment:
   ```suggestion
   private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum 
acceptable lag (number of offsets to catch up) for a client to be considered 
caught-up enough" +
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController

2021-10-19 Thread GitBox


dielhennr commented on a change in pull request #10772:
URL: https://github.com/apache/kafka/pull/10772#discussion_r732112282



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
##
@@ -26,28 +26,36 @@
 import java.util.Objects;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
-private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
-"KafkaController", "ActiveControllerCount");
-private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
-"ControllerEventManager", "EventQueueTimeMs");
-private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
-"ControllerEventManager", "EventQueueProcessingTimeMs");
-private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
-"KafkaController", "GlobalTopicCount");
-private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
-"KafkaController", "GlobalPartitionCount");
-private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
-"KafkaController", "OfflinePartitionsCount");
-private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = 
getMetricName(
-"KafkaController", "PreferredReplicaImbalanceCount");
-
+private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName(

Review comment:
   This was a mistake when I merged trunk into my branch. I reverted it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732092804



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+this.verificationKeyResolver = verificationKeyResolver;
+this.accessTokenValidator = accessTokenValidator;
+
+try {
+verificationKeyResolver.init();
+} catch (Exception e) {
+throw new KafkaException("The OAuth validator configuration 
encountered an error when initializing the VerificationKeyResolver", e);
+}
+
+isConfigured = true;
+}
+
+@Override
+public void close() {
+if (verificationKeyResolver != null) {
+try {
+verificationKeyResolver.close();
+} catch (Exception e) {
+log.error(e.getMessage(), e);
+}
+}
+}
+
+@Override
+public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+checkConfigured();
+
+for (Callback callback : callbacks) {
+if (callback instanceof OAuthBearerValidatorCallback) {
+handle((OAuthBearerValidatorCallback) callback);
+} else if (callback instanceof 
OAuthBearerExtensionsValidatorCallback) {
+OAuthBearerExtensionsValidatorCallback extensionsCallback = 
(OAuthBearerExtensionsValidatorCallback) callback;
+
extensionsCallback.inputExtensions().map().forEach((extensionName, v) -> 
extensionsCallback.valid(extensionName));

Review comment:
   For now, the server side extension validation is a no-op, we just make 
it as validated? The real function will be our ce-kafka server side 
authentication?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732092038



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
   config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+  brokers: Seq[B],
+  adminConfig: Properties): Admin = {
+val adminClientProperties = if (adminConfig.isEmpty) {
+  val newConfig = new Properties()
+  newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerListStrFromServers(brokers))
+  newConfig
+} else {
+  adminConfig
+}
+Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](

Review comment:
   There are a lot of differences, though: IntegrationTestUtils.createTopic 
doesn't take a list of brokers, requires you to create an admin client 
externally, won't work if the topic already exists. So it can't really be used 
as a drop-in replacement for `TestUtils#createTopic`, which is the intention 
here.
   
   For now we should probably just accept the duplication, I think, although 
it's not ideal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732090271



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
   config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+  brokers: Seq[B],
+  adminConfig: Properties): Admin = {
+val adminClientProperties = if (adminConfig.isEmpty) {
+  val newConfig = new Properties()
+  newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerListStrFromServers(brokers))
+  newConfig
+} else {
+  adminConfig
+}
+Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](
+  topic: String,
+  numPartitions: Int = 1,
+  replicationFactor: Int = 1,
+  brokers: Seq[B],
+  topicConfig: Properties = new Properties,
+  adminConfig: Properties = new Properties): 
scala.collection.immutable.Map[Int, Int] = {
+val adminClient = createAdminClient(brokers, adminConfig)
+try {
+  val configsMap = new java.util.HashMap[String, String]()
+  topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
+  try {
+adminClient.createTopics(Collections.singletonList(new NewTopic(
+  topic, numPartitions, 
replicationFactor.toShort).configs(configsMap))).all().get()
+  } catch {
+case e: ExecutionException => if (e.getCause != null &&
+  e.getCause.isInstanceOf[TopicExistsException] &&
+  topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic, 
numPartitions, replicationFactor)) {
+} else {

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732089124



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1640,6 +1800,37 @@ object TestUtils extends Logging {
 }, s"Timed out waiting for brokerId $brokerId to come online")
   }
 
+  def getReplicaAssignmentForTopics[B <: KafkaBroker](
+  topicNames: Seq[String],
+  brokers: Seq[B],
+  adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] 
= {
+val adminClient = createAdminClient(brokers, adminConfig)
+val results = new mutable.HashMap[TopicPartition, Seq[Int]]
+try {
+  
adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach {
+case (topicName, future) =>
+  try {
+val description = future.get()
+description.partitions().forEach {
+  case partition =>
+val topicPartition = new TopicPartition(topicName, 
partition.partition())
+results.put(topicPartition, 
partition.replicas().asScala.map(_.id))
+}
+  } catch {
+case e: ExecutionException => if (e.getCause != null &&
+  e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
+  // ignore

Review comment:
   I believe this is consistent with the behavior we have in the ZK case 
(which is not in TestUtils, but in KafkaZkClient or somewhere, as I recall)...
   
   I will add a JavaDoc comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732088812



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {
+this.verificationKeyResolver = verificationKeyResolver;
+this.accessTokenValidator = accessTokenValidator;
+
+try {
+verificationKeyResolver.init();
+} catch (Exception e) {
+throw new KafkaException("The OAuth validator configuration 
encountered an error when initializing the VerificationKeyResolver", e);
+}
+
+isConfigured = true;

Review comment:
   nit: should we move the flag setting to the end of main configure() 
function?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732088424



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerValidatorCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerValidatorCallbackHandler.class);
+
+private CloseableVerificationKeyResolver verificationKeyResolver;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+CloseableVerificationKeyResolver verificationKeyResolver = 
VerificationKeyResolverFactory.create(configs, saslMechanism);
+AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs, saslMechanism, 
verificationKeyResolver);
+configure(verificationKeyResolver, accessTokenValidator);
+}
+
+public void configure(CloseableVerificationKeyResolver 
verificationKeyResolver, AccessTokenValidator accessTokenValidator) {

Review comment:
   ditto: should we use init() function name?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732087662



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1220,37 +1348,42 @@ object TestUtils extends Logging {
 }
   }
 
-
-  def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, 
numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
+  def verifyTopicDeletion[B <: KafkaBroker](
+  zkClient: KafkaZkClient,
+  topic: String,
+  numPartitions: Int,
+  brokers: Seq[B]): Unit = {
 val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _))
-// wait until admin path for delete topic is deleted, signaling completion 
of topic deletion
-waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
-  "Admin path /admin/delete_topics/%s path not deleted even after a 
replica is restarted".format(topic))
-waitUntilTrue(() => !zkClient.topicExists(topic),
-  "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s 
path is deleted".format(topic, topic))
+if (zkClient != null) {

Review comment:
   It provides some additional information in the ZK case. For example, it 
might help diagnose a situation where the ZK controller never creates the znode 
under delete_topics.
   
   We can get rid of it eventually but I was trying to make a minimal change...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()

2021-10-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430650#comment-17430650
 ] 

Matthias J. Sax commented on KAFKA-13373:
-

I did not look into it too deeply, but I guess it must be related to our 
internal wrappers. To unify runtime code, we wrap different user interfaces 
such that the runtime only works with a reduced surface area and the wrappers 
take are to translate between the interfaces. My suspicious is that some 
wrapper is not forwarding a `stores()` call correctly. If that's true, the fix 
itself should be simple – the tricky part is only to find the right place in 
the code...

> ValueTransformerWithKeySupplier doesn't work with store()
> -
>
> Key: KAFKA-13373
> URL: https://issues.apache.org/jira/browse/KAFKA-13373
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Anatoly Tsyganenko
>Priority: Minor
>  Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like 
> this:
>  
> {code:java}
> public final class CustomSupplier implements 
> ValueTransformerWithKeySupplier, JsonNode, JsonNode> {
> private final String storeName = "my-store";
> public Set> stores() {
> final Deserializer jsonDeserializer = new 
> JsonDeserializer();
> final Serializer jsonSerializer = new JsonSerializer();
> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, 
> jsonDeserializer);
> final Serde stringSerde = Serdes.String();
> final StoreBuilder> store 
> = 
> Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
> stringSerde, jsonSerde).withLoggingDisabled();
> return Collections.singleton(store);
> }
> @Override
> public ValueTransformerWithKey, JsonNode, JsonNode> 
> get() {
> return new ValueTransformerWithKey, JsonNode, 
> JsonNode>() {
> private ProcessorContext context;
> private TimestampedKeyValueStore store;
> @Override
> public void init(final ProcessorContext context) {
> this.store = context.getStateStore(storeName);
> this.context = context;
> }
> //
> }{code}
>  
> But got next error for line "this.store = context.getStateStore(storeName);" 
> in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> KTABLE-TRANSFORMVALUES-08 has no access to StateStore my-store as the 
> store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.{code}
>  
> The same code works perfect with Transform or when I adding store to builder. 
> Looks like something wrong when ConnectedStoreProvider and 
> ValueTransformerWithKeySupplier used together.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


hachikuji commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732060868



##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
   config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+  brokers: Seq[B],
+  adminConfig: Properties): Admin = {
+val adminClientProperties = if (adminConfig.isEmpty) {
+  val newConfig = new Properties()
+  newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerListStrFromServers(brokers))
+  newConfig
+} else {
+  adminConfig
+}
+Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](

Review comment:
   There seems to be some duplication with 
`IntegrationTestUtils.createTopic`. 

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
   config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+  brokers: Seq[B],
+  adminConfig: Properties): Admin = {
+val adminClientProperties = if (adminConfig.isEmpty) {

Review comment:
   Perhaps instead of checking if the config is empty, we can check if the 
bootstrap servers property is defined?

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1640,6 +1800,37 @@ object TestUtils extends Logging {
 }, s"Timed out waiting for brokerId $brokerId to come online")
   }
 
+  def getReplicaAssignmentForTopics[B <: KafkaBroker](
+  topicNames: Seq[String],
+  brokers: Seq[B],
+  adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] 
= {
+val adminClient = createAdminClient(brokers, adminConfig)
+val results = new mutable.HashMap[TopicPartition, Seq[Int]]
+try {
+  
adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach {
+case (topicName, future) =>
+  try {
+val description = future.get()
+description.partitions().forEach {
+  case partition =>
+val topicPartition = new TopicPartition(topicName, 
partition.partition())
+results.put(topicPartition, 
partition.replicas().asScala.map(_.id))
+}
+  } catch {
+case e: ExecutionException => if (e.getCause != null &&
+  e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
+  // ignore

Review comment:
   Wondering if it would be better to let this propagate instead of 
returning an empty result. At least maybe we can mention the behavior in a doc 
comment.

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1220,37 +1348,42 @@ object TestUtils extends Logging {
 }
   }
 
-
-  def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, 
numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
+  def verifyTopicDeletion[B <: KafkaBroker](
+  zkClient: KafkaZkClient,
+  topic: String,
+  numPartitions: Int,
+  brokers: Seq[B]): Unit = {
 val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _))
-// wait until admin path for delete topic is deleted, signaling completion 
of topic deletion
-waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
-  "Admin path /admin/delete_topics/%s path not deleted even after a 
replica is restarted".format(topic))
-waitUntilTrue(() => !zkClient.topicExists(topic),
-  "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s 
path is deleted".format(topic, topic))
+if (zkClient != null) {

Review comment:
   I wonder if we need this logic. It seems not necessary for kraft 
clusters, so why do we need it for zk clusters?

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
   config.setProperty(KafkaConfig.LogMessageFormatVersionProp, 
version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+  brokers: Seq[B],
+  adminConfig: Properties): Admin = {
+val adminClientProperties = if (adminConfig.isEmpty) {
+  val newConfig = new Properties()
+  newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerListStrFromServers(brokers))
+  newConfig
+} else {
+  adminConfig
+}
+Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](
+  topic: String,
+  numPartitions: Int = 1,
+  replicationFactor: Int = 1,
+  brokers: Seq[B],
+  topicConfig: Properties = new Properties,
+  adminConfig: Properties = new Properties): 
scala.collection.immutable.Map[Int, Int] = {
+val adminClient = 

[GitHub] [kafka] vamossagar12 edited a comment on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-19 Thread GitBox


vamossagar12 edited a comment on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369


   Thanks Sophie for that.
   @showuon , I think your concern about persistent stores and custom stores is 
still valid. The reason why I don't want to add the getObservedStreamTime 
method in SessionStore or WindowStore is that I feel it's not something which 
should be added to those interfaces. It's an internal level detail about how to 
track observed stream time.
   
   Having said that, if we want this behaviour to be also available for custom 
stores(and which is why we chose to add it to MeteredStores), then those custom 
stores need to implement PersistentStore classes which is not the way it works 
today, right? 
   
   One approach could be to think about custom state stores separately and have 
this merged is this looks fine. That's because I think custom state stores will 
need more thinking because of the way the State stores are structured or 
wrapped. Or else I am open to suggestions !
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-10-19 Thread GitBox


vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-946918369


   Thanks Sophie for that.
   @showuon , I think your concern about persistent stores and custom stores is 
still valid. The reason why I don't want to add the getObservedStreamTime 
method in SessionStore or WindowStore is that I feel it's not something which 
should be added to those interfaces. It's an internal level detail about how to 
track observed stream time.
   
   Having said that, if we want this behaviour to be also available for custom 
stores(and which is why we chose to add it to MeteredStores), then those custom 
stores need to implement PersistentStore classes which is not the way it works 
today, right? 
   
   One approach could be to think about custom state stores separately and have 
this merged is this looks fine. Or else I am open to suggestions !
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732068749



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+public static final String CLIENT_ID_CONFIG = "clientId";
+public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+public static final String SCOPE_CONFIG = "scope";
+
+public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client ID to uniquely identify the service account to use for 
authentication for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_SECRET_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client secret serves a similar function as a password to the " + 
CLIENT_ID_CONFIG + " " +
+"account and identifies the service account to use for authentication 
for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_ID_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login 
request to the " +
+"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need 
to specify an " +
+"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide 
the value to " +
+"include with the login request.";
+
+private static final String EXTENSION_PREFIX = "extension_";
+
+private Map moduleOptions;
+
+private AccessTokenRetriever accessTokenRetriever;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+if 
(!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+throw new IllegalArgumentException(String.format("Unexpected SASL 
mechanism: %s", saslMechanism));
+
+if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || 
jaasConfigEntries.get(0) == null)
+throw new IllegalArgumentException(String.format("Must supply 
exactly 1 non-null JAAS mechanism configuration (size was %d)", 
jaasConfigEntries.size()));
+
+moduleOptions = 
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+AccessTokenRetriever accessTokenRetriever = 

[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r732068187



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandler.java
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerLoginCallbackHandler implements 
AuthenticateCallbackHandler {
+
+private static final Logger log = 
LoggerFactory.getLogger(OAuthBearerLoginCallbackHandler.class);
+
+public static final String CLIENT_ID_CONFIG = "clientId";
+public static final String CLIENT_SECRET_CONFIG = "clientSecret";
+public static final String SCOPE_CONFIG = "scope";
+
+public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client ID to uniquely identify the service account to use for 
authentication for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_SECRET_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String CLIENT_SECRET_DOC = "The OAuth/OIDC identity 
provider-issued " +
+"client secret serves a similar function as a password to the " + 
CLIENT_ID_CONFIG + " " +
+"account and identifies the service account to use for authentication 
for " +
+"this client. The value must be paired with a corresponding " + 
CLIENT_ID_CONFIG + " " +
+"value and is provided to the OAuth provider using the OAuth " +
+"clientcredentials grant type.";
+
+public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login 
request to the " +
+"token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URI + ") may need 
to specify an " +
+"OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide 
the value to " +
+"include with the login request.";
+
+private static final String EXTENSION_PREFIX = "extension_";
+
+private Map moduleOptions;
+
+private AccessTokenRetriever accessTokenRetriever;
+
+private AccessTokenValidator accessTokenValidator;
+
+private boolean isConfigured = false;
+
+@Override
+public void configure(Map configs, String saslMechanism, 
List jaasConfigEntries) {
+if 
(!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+throw new IllegalArgumentException(String.format("Unexpected SASL 
mechanism: %s", saslMechanism));
+
+if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || 
jaasConfigEntries.get(0) == null)
+throw new IllegalArgumentException(String.format("Must supply 
exactly 1 non-null JAAS mechanism configuration (size was %d)", 
jaasConfigEntries.size()));
+
+moduleOptions = 
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+AccessTokenRetriever accessTokenRetriever = 

[jira] [Created] (KAFKA-13385) In the KRPC request header, translate null clientID to empty

2021-10-19 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13385:


 Summary: In the KRPC request header, translate null clientID to 
empty
 Key: KAFKA-13385
 URL: https://issues.apache.org/jira/browse/KAFKA-13385
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()

2021-10-19 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-13373:
-
Labels: newbie  (was: )

> ValueTransformerWithKeySupplier doesn't work with store()
> -
>
> Key: KAFKA-13373
> URL: https://issues.apache.org/jira/browse/KAFKA-13373
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Anatoly Tsyganenko
>Priority: Minor
>  Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like 
> this:
>  
> {code:java}
> public final class CustomSupplier implements 
> ValueTransformerWithKeySupplier, JsonNode, JsonNode> {
> private final String storeName = "my-store";
> public Set> stores() {
> final Deserializer jsonDeserializer = new 
> JsonDeserializer();
> final Serializer jsonSerializer = new JsonSerializer();
> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, 
> jsonDeserializer);
> final Serde stringSerde = Serdes.String();
> final StoreBuilder> store 
> = 
> Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
> stringSerde, jsonSerde).withLoggingDisabled();
> return Collections.singleton(store);
> }
> @Override
> public ValueTransformerWithKey, JsonNode, JsonNode> 
> get() {
> return new ValueTransformerWithKey, JsonNode, 
> JsonNode>() {
> private ProcessorContext context;
> private TimestampedKeyValueStore store;
> @Override
> public void init(final ProcessorContext context) {
> this.store = context.getStateStore(storeName);
> this.context = context;
> }
> //
> }{code}
>  
> But got next error for line "this.store = context.getStateStore(storeName);" 
> in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> KTABLE-TRANSFORMVALUES-08 has no access to StateStore my-store as the 
> store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.{code}
>  
> The same code works perfect with Transform or when I adding store to builder. 
> Looks like something wrong when ConnectedStoreProvider and 
> ValueTransformerWithKeySupplier used together.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController

2021-10-19 Thread GitBox


cmccabe commented on pull request #10772:
URL: https://github.com/apache/kafka/pull/10772#issuecomment-946887188


   thanks for this, @dielhennr. I left some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #10772:
URL: https://github.com/apache/kafka/pull/10772#discussion_r732038559



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
##
@@ -26,28 +26,36 @@
 import java.util.Objects;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
-private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
-"KafkaController", "ActiveControllerCount");
-private final static MetricName EVENT_QUEUE_TIME_MS = getMetricName(
-"ControllerEventManager", "EventQueueTimeMs");
-private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
-"ControllerEventManager", "EventQueueProcessingTimeMs");
-private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
-"KafkaController", "GlobalTopicCount");
-private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
-"KafkaController", "GlobalPartitionCount");
-private final static MetricName OFFLINE_PARTITION_COUNT = getMetricName(
-"KafkaController", "OfflinePartitionsCount");
-private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = 
getMetricName(
-"KafkaController", "PreferredReplicaImbalanceCount");
-
+private final static MetricName ACTIVE_CONTROLLER_COUNT = new MetricName(

Review comment:
   why are we changing from `getMetricName` to `new MetricName`? Should 
this be a separate PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #10772:
URL: https://github.com/apache/kafka/pull/10772#discussion_r732036343



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -249,6 +256,10 @@ public void replay(RegisterBrokerRecord record) {
 features.put(feature.name(), new VersionRange(
 feature.minSupportedVersion(), feature.maxSupportedVersion()));
 }
+   
+if (record.fenced()) {
+
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 
1);

Review comment:
   We need to update this based on what the previous registration was, if 
any. If the previous registration was also fenced, we do not want to increase 
the fenced broker count.
   
   It is also possible for the new registration to start as unfenced. Please 
look at the record definitions.
   
   It would be useful to have a helper function that took as an argument the 
previous registration (or null) and the new registration (or null), and updated 
the metrics accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10772: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController

2021-10-19 Thread GitBox


cmccabe commented on a change in pull request #10772:
URL: https://github.com/apache/kafka/pull/10772#discussion_r732036343



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -249,6 +256,10 @@ public void replay(RegisterBrokerRecord record) {
 features.put(feature.name(), new VersionRange(
 feature.minSupportedVersion(), feature.maxSupportedVersion()));
 }
+   
+if (record.fenced()) {
+
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 
1);

Review comment:
   We need to update this based on what the previous registration was, if 
any.
   
   It would be useful to have a helper function that took as an argument the 
previous registration (or null) and the new registration (or null), and updated 
the metrics accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11410: MINOR: Make TestUtils usable for KRaft mode

2021-10-19 Thread GitBox


cmccabe commented on pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#issuecomment-946860744


   Test failures seem like flakes (they pass locally)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11416: MINOR: Improve createTopics and incrementalAlterConfigs in KRaft

2021-10-19 Thread GitBox


cmccabe opened a new pull request #11416:
URL: https://github.com/apache/kafka/pull/11416


   For CreateTopics, fix a bug where if one createTopics in a batch failed, 
they would all fail with
   the same error code.  Make the error message for TOPIC_ALREADY_EXISTS 
consistent with the ZK-based
   code by including the topic name.
   
   For IncrementalAlterConfigs, before we allow topic configurations to be set, 
we should check that
   they are valid. (This also applies to newly created topics.) 
IncrementalAlterConfigs should ignore
   non-null payloads for DELETE operations. Previously we would return an error 
in these cases.
   However, this is not compatible with the old ZK-based code, which ignores 
the payload in these
   cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13374) [Docs] - All reads from the leader of the partition even after KIP-392?

2021-10-19 Thread Robin Moffatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430590#comment-17430590
 ] 

Robin Moffatt commented on KAFKA-13374:
---

Looks good to me as a layperson - thanks for picking this up [~showuon].

> [Docs] - All reads from the leader of the partition even after KIP-392?
> ---
>
> Key: KAFKA-13374
> URL: https://issues.apache.org/jira/browse/KAFKA-13374
> Project: Kafka
>  Issue Type: Bug
>Reporter: Robin Moffatt
>Assignee: Luke Chen
>Priority: Trivial
>
> On `https://kafka.apache.org/documentation/#design_replicatedlog` it says
> > All reads and writes go to the leader of the partition.
> However with KIP-392 I didn't think this was the case any more. If so, the 
> doc should be updated to clarify. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo closed pull request #10507: KAFKA-8410: Migrating stateful operators to new Processor API

2021-10-19 Thread GitBox


jeqo closed pull request #10507:
URL: https://github.com/apache/kafka/pull/10507


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted

2021-10-19 Thread Alexander (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander updated KAFKA-13384:
--
Description: 
We found a misbehavior on our Kafka cluster - version: 2.6.2 
(Commit:da65af02e5856e34)) _*FailedPartitionsCount*_ metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
 # corrupt a partition log file
 # restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed
{code:java}
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread){code}
But the value of _*FailedPartitionsCount*_ metric will be 0 (see attached 
screenshot)

  was:
We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
 # corrupt a partition log file
 # restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

 
{code:java}
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread){code}
 

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)


> FailedPartitionsCount metric is not updated if a partition log file was 
> corrupted
> -
>
> Key: KAFKA-13384
> URL: https://issues.apache.org/jira/browse/KAFKA-13384
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
> Environment: OS:
> NAME="Amazon Linux AMI"
> VERSION="2018.03"
> ID="amzn"
> ID_LIKE="rhel fedora"
> VERSION_ID="2018.03"
> PRETTY_NAME="Amazon Linux AMI 2018.03"
> CPE_NAME="cpe:/o:amazon:linux:2018.03:ga"
> HOME_URL="http://aws.amazon.com/amazon-linux-ami/;
> Kafka version:
> 2.6.2 (Commit:da65af02e5856e34)
>Reporter: Alexander
>Priority: Major
> Attachments: Screenshot 2021-10-19 at 15.28.33.png
>
>
> We found a misbehavior on our Kafka cluster - version: 2.6.2 
> (Commit:da65af02e5856e34)) _*FailedPartitionsCount*_ metric is not updated if 
> a partition log file was corrupted
> Steps to reproduce the problem:
>  # corrupt a partition log file
>  # restart Kafka process
> After that, you will get a correct log which tells that Kafka marked 
> corrupted partitions as failed
> {code:java}
> 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
> replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as 
> failed
>  (kafka.server.ReplicaFetcherThread){code}
> But the value of _*FailedPartitionsCount*_ metric will be 0 (see attached 
> screenshot)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted

2021-10-19 Thread Alexander (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander updated KAFKA-13384:
--
Description: 
We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
 # corrupt a partition log file
 # restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

 
{code:java}
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread){code}
 

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)

  was:
We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
 # corrupt a partition log file
 # restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

 
{code:java}
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread){code}
 

 

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)


> FailedPartitionsCount metric is not updated if a partition log file was 
> corrupted
> -
>
> Key: KAFKA-13384
> URL: https://issues.apache.org/jira/browse/KAFKA-13384
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
> Environment: OS:
> NAME="Amazon Linux AMI"
> VERSION="2018.03"
> ID="amzn"
> ID_LIKE="rhel fedora"
> VERSION_ID="2018.03"
> PRETTY_NAME="Amazon Linux AMI 2018.03"
> CPE_NAME="cpe:/o:amazon:linux:2018.03:ga"
> HOME_URL="http://aws.amazon.com/amazon-linux-ami/;
> Kafka version:
> 2.6.2 (Commit:da65af02e5856e34)
>Reporter: Alexander
>Priority: Major
> Attachments: Screenshot 2021-10-19 at 15.28.33.png
>
>
> We found a misbehavior on our Kafka cluster (version: 2.6.2 
> (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if 
> a partition log file was corrupted
> Steps to reproduce the problem:
>  # corrupt a partition log file
>  # restart Kafka process
> After that, you will get a correct log which tells that Kafka marked 
> corrupted partitions as failed
>  
> {code:java}
> 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
> replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as 
> failed
>  (kafka.server.ReplicaFetcherThread){code}
>  
> But the value of `FailedPartitionsCount` metric will be 0 (see attached 
> screenshot)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted

2021-10-19 Thread Alexander (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander updated KAFKA-13384:
--
Description: 
We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
 # corrupt a partition log file
 # restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

 
{code:java}
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread){code}
 

 

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)

  was:
We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
1. corrupt a partition log file
2. restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

```
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread)
```

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)


> FailedPartitionsCount metric is not updated if a partition log file was 
> corrupted
> -
>
> Key: KAFKA-13384
> URL: https://issues.apache.org/jira/browse/KAFKA-13384
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
> Environment: OS:
> NAME="Amazon Linux AMI"
> VERSION="2018.03"
> ID="amzn"
> ID_LIKE="rhel fedora"
> VERSION_ID="2018.03"
> PRETTY_NAME="Amazon Linux AMI 2018.03"
> CPE_NAME="cpe:/o:amazon:linux:2018.03:ga"
> HOME_URL="http://aws.amazon.com/amazon-linux-ami/;
> Kafka version:
> 2.6.2 (Commit:da65af02e5856e34)
>Reporter: Alexander
>Priority: Major
> Attachments: Screenshot 2021-10-19 at 15.28.33.png
>
>
> We found a misbehavior on our Kafka cluster (version: 2.6.2 
> (Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if 
> a partition log file was corrupted
> Steps to reproduce the problem:
>  # corrupt a partition log file
>  # restart Kafka process
> After that, you will get a correct log which tells that Kafka marked 
> corrupted partitions as failed
>  
> {code:java}
> 2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
> replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as 
> failed
>  (kafka.server.ReplicaFetcherThread){code}
>  
>  
> But the value of `FailedPartitionsCount` metric will be 0 (see attached 
> screenshot)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13384) FailedPartitionsCount metric is not updated if a partition log file was corrupted

2021-10-19 Thread Alexander (Jira)
Alexander created KAFKA-13384:
-

 Summary: FailedPartitionsCount metric is not updated if a 
partition log file was corrupted
 Key: KAFKA-13384
 URL: https://issues.apache.org/jira/browse/KAFKA-13384
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.2
 Environment: OS:
NAME="Amazon Linux AMI"
VERSION="2018.03"
ID="amzn"
ID_LIKE="rhel fedora"
VERSION_ID="2018.03"
PRETTY_NAME="Amazon Linux AMI 2018.03"
CPE_NAME="cpe:/o:amazon:linux:2018.03:ga"
HOME_URL="http://aws.amazon.com/amazon-linux-ami/;

Kafka version:
2.6.2 (Commit:da65af02e5856e34)
Reporter: Alexander
 Attachments: Screenshot 2021-10-19 at 15.28.33.png

We found a misbehavior on our Kafka cluster (version: 2.6.2 
(Commit:da65af02e5856e34)), `FailedPartitionsCount` metric is not updated if a 
partition log file was corrupted

Steps to reproduce the problem:
1. corrupt a partition log file
2. restart Kafka process

After that, you will get a correct log which tells that Kafka marked corrupted 
partitions as failed

```
2021-10-19T14:49:31+02:00 [2021-10-19 12:49:30,924] WARN [ReplicaFetcher 
replicaId=11, leaderId=10, fetcherId=0] Partition test_topic-1 marked as failed
 (kafka.server.ReplicaFetcherThread)
```

But the value of `FailedPartitionsCount` metric will be 0 (see attached 
screenshot)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo opened a new pull request #11415: chore: ignore bin on new modules

2021-10-19 Thread GitBox


jeqo opened a new pull request #11415:
URL: https://github.com/apache/kafka/pull/11415


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10543) Convert KTable joins to new PAPI

2021-10-19 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430507#comment-17430507
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-10543:
--

WIP: [https://github.com/apache/kafka/pull/11412]

> Convert KTable joins to new PAPI
> 
>
> Key: KAFKA-10543
> URL: https://issues.apache.org/jira/browse/KAFKA-10543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10540) Convert KStream aggregations to new PAPI

2021-10-19 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-10540.
--
Resolution: Fixed

https://github.com/apache/kafka/pull/11315

> Convert KStream aggregations to new PAPI
> 
>
> Key: KAFKA-10540
> URL: https://issues.apache.org/jira/browse/KAFKA-10540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2021-10-19 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-10539.
--
Resolution: Fixed

https://github.com/apache/kafka/pull/11356

> Convert KStreamImpl joins to new PAPI
> -
>
> Key: KAFKA-10539
> URL: https://issues.apache.org/jira/browse/KAFKA-10539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on pull request #11414: MINOR: Renamed a few record definition files with the existing convention.

2021-10-19 Thread GitBox


satishd commented on pull request #11414:
URL: https://github.com/apache/kafka/pull/11414#issuecomment-946647417


   @junrao This is a minor PR with the existing file name conventions and 
throwing a proper error message while reading LeaderEpochCache with an 
unsupported version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd opened a new pull request #11414: MINOR: Renamed a few record definition files with the existing convention.

2021-10-19 Thread GitBox


satishd opened a new pull request #11414:
URL: https://github.com/apache/kafka/pull/11414


   MINOR: Renamed a few record definition files with the existing convention.
- Throwing an error message while reading LeaderEpochCheckpoint file with 
an unsupported version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13374) [Docs] - All reads from the leader of the partition even after KIP-392?

2021-10-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430448#comment-17430448
 ] 

Luke Chen commented on KAFKA-13374:
---

Thank you for reporting and confirming, [~rmoff] [~jlaskowski] , I've submitted 
PR and ready for review: [https://github.com/apache/kafka/pull/11408]

Welcome to provide comments. Thank you.

> [Docs] - All reads from the leader of the partition even after KIP-392?
> ---
>
> Key: KAFKA-13374
> URL: https://issues.apache.org/jira/browse/KAFKA-13374
> Project: Kafka
>  Issue Type: Bug
>Reporter: Robin Moffatt
>Assignee: Luke Chen
>Priority: Trivial
>
> On `https://kafka.apache.org/documentation/#design_replicatedlog` it says
> > All reads and writes go to the leader of the partition.
> However with KIP-392 I didn't think this was the case any more. If so, the 
> doc should be updated to clarify. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13043) Add Admin API for batched offset fetch requests (KIP-709)

2021-10-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13043:

Fix Version/s: (was: 3.1.0)
   3.2.0

> Add Admin API for batched offset fetch requests (KIP-709)
> -
>
> Key: KAFKA-13043
> URL: https://issues.apache.org/jira/browse/KAFKA-13043
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Rajini Sivaram
>Assignee: Sanjana Kaundinya
>Priority: Major
> Fix For: 3.2.0
>
>
> Protocol changes and broker-side changes to process batched 
> OffsetFetchRequests were added under KAFKA-12234. This ticket is to add Admin 
> API changes to use this feature.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13043) Add Admin API for batched offset fetch requests (KIP-709)

2021-10-19 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430444#comment-17430444
 ] 

David Jacot commented on KAFKA-13043:
-

Moving this feature to 3.2.0 as [~skaundinya] does not have cycles to finish it 
for 3.1.0.

> Add Admin API for batched offset fetch requests (KIP-709)
> -
>
> Key: KAFKA-13043
> URL: https://issues.apache.org/jira/browse/KAFKA-13043
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Rajini Sivaram
>Assignee: Sanjana Kaundinya
>Priority: Major
> Fix For: 3.1.0
>
>
> Protocol changes and broker-side changes to process batched 
> OffsetFetchRequests were added under KAFKA-12234. This ticket is to add Admin 
> API changes to use this feature.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13370) Offset commit failure percentage metric is not computed correctly (regression)

2021-10-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430429#comment-17430429
 ] 

Luke Chen commented on KAFKA-13370:
---

[~20100g], thanks for reporting the issue. PR is ready now: 
[https://github.com/apache/kafka/pull/11413]

Welcome to provide comments. Thanks.

> Offset commit failure percentage metric is not computed correctly (regression)
> --
>
> Key: KAFKA-13370
> URL: https://issues.apache.org/jira/browse/KAFKA-13370
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, metrics
>Affects Versions: 2.8.0
> Environment: Confluent Platform Helm Chart (v6.2.0)
>Reporter: Vincent Giroux
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 2.8.0
>
>
> There seems to have been a regression in the way the offset-commit-* metrics 
> are calculated for *source* Kafka Connect connectors since version 2.8.0.
> Before this version, any timeout or interruption while trying to commit 
> offsets for source connectors (e.g. MM2 MirrorSourceConnector) would get 
> correctly flagged as an offset commit failure (i.e the 
> *offset-commit-failure-percentage* metric ** would be non-zero). Since 
> version 2.8.0, these errors are considered as successes.
> After digging through the code, the commit where this bug was introduced 
> appears to be this one : 
> [https://github.com/apache/kafka/commit/047ad654da7903f3903760b0e6a6a58648ca7715]
> I believe removing the boolean *success* argument in the *recordCommit* 
> method of the *WorkerTask* class (argument deemed redundant because of the 
> presence of the Throwable *error* argument) and only considering the presence 
> of a non-null error to determine if a commit is a success or failure might be 
> a mistake. This is because in the *commitOffsets* method of the 
> *WorkerSourceTask* class, there are multiple cases where an exception object 
> is either not available or is not passed to the *recordCommitFailure* method, 
> e.g. :
>  * *TImeout #1* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L519]
>  
>  * *Timeout #2* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L584]
>  
>  * *Interruption* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L529]
>  
>  * *Unserializable offset* : 
> [https://github.com/apache/kafka/blob/2.8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L562]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-946513668


   @chia7712 , could you please take a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731649801



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -1580,9 +1652,22 @@ private void assertPollMetrics(int 
minimumPollCountExpected) {
 assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
 double activeCount = 
metrics.currentMetricValueAsDouble(sourceTaskGroup, 
"source-record-active-count");
 double activeCountMax = 
metrics.currentMetricValueAsDouble(sourceTaskGroup, 
"source-record-active-count-max");
-assertEquals(0, activeCount, 0.01d);
-if (minimumPollCountExpected > 0) {
-assertEquals(RECORDS.size(), activeCountMax, 0.01d);
+
+if (isWriteCompleted) {
+assertEquals(0, activeCount, 0.01d);
+if (minimumPollCountExpected > 0) {
+assertEquals(RECORDS.size(), activeCountMax, 0.01d);
+}
+}
+
+double failurePercentage = 
metrics.currentMetricValueAsDouble(taskGroup, 
"offset-commit-failure-percentage");
+double successPercentage = 
metrics.currentMetricValueAsDouble(taskGroup, 
"offset-commit-success-percentage");
+
+if (!isCommitSucceeded) {
+assertTrue(failurePercentage > 0);
+} else {
+assertTrue(failurePercentage == 0);
+assertTrue(successPercentage > 0);

Review comment:
   Add `offset-commit-failure-percentage` and 
`"offset-commit-success-percentage` metrics verification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731648129



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##
@@ -129,6 +129,7 @@
 // is used in the right place.
 private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
 private static final byte[] SERIALIZED_RECORD = 
"converted-record".getBytes();
+private static final long STOP_TIME_OUT = 1;

Review comment:
   Sometimes the worker task stops need more than 1 sec. Increasing the 
timeout to 10 secs to make it reliable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731647369



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -550,16 +552,16 @@ public boolean commitOffsets() {
 // Now we can actually flush the offsets to user storage.
 Future flushFuture = offsetWriter.doFlush((error, result) -> {
 if (error != null) {
+// Very rare case: offsets were unserializable and we finished 
immediately, unable to store
+// any data
 log.error("{} Failed to flush offsets to storage: ", 
WorkerSourceTask.this, error);
+finishFailedFlush();
+recordCommitFailure(time.milliseconds() - started, error);
 } else {

Review comment:
   `doFlush` will return `null` after calling the callback with `error` 
attached. Handle the failed flush here, since we can know which error is thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731645529



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -514,9 +514,11 @@ public boolean commitOffsets() {
 // If the task has been cancelled, no more records will be 
sent from the producer; in that case, if any outstanding messages remain,
 // we can stop flushing immediately
 if (isCancelled() || timeoutMs <= 0) {
-log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
+log.error("{} Failed to flush, task is cancelled, or 
timed out while waiting for producer " +
+"to flush outstanding {} messages", this, 
outstandingMessages.size());

Review comment:
   side fix: the error could be timed out or "cancelled". Add in the log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

2021-10-19 Thread GitBox


showuon opened a new pull request #11413:
URL: https://github.com/apache/kafka/pull/11413


   In https://github.com/apache/kafka/pull/9642, we removed the unnecessary 
`success` parameter, and use the `error` as the key to identify if the commit 
successfully or failed. However, there are some cases we passed `success` with 
`false`, but without `error` value. I think we should always pass the `error` 
value when failed. Fix it and add tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-10-19 Thread Aditya Upadhyaya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430335#comment-17430335
 ] 

Aditya Upadhyaya commented on KAFKA-12559:
--

[~ableegoldman]  I'd like to pick up this task if no one is currently working 
on it. Let me know your thoughts.

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Martin Sundeqvist
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2021-10-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17430275#comment-17430275
 ] 

Eugen Dück edited comment on KAFKA-13289 at 10/19/21, 6:33 AM:
---

We are running into similar issues (6.0.1-ccs for broker and kafka-streams 
library, i.e. kafka 2.6.1)
 * lots of "Skipping record for expired segment." warnings in 
AbstractRocksDBSegmentedBytesStore
 * at some point, our topology stops outputting data

As we don't have any re-partitioning in our pipeline, I tried to remove the 
re-keying part from Matthew's code, and as far as I can tell, the problem still 
persists, so it would look like it is not related to re-partitioning. Btw. the 
problem shows even when doing just 10 instead of 1000 messages per topic. Find 
my fork of Matthew's code here: 
 [https://github.com/EugenDueck/ins14809]

This is the output of one such test run:

{{[INFO] ---}}
 {{[INFO] T E S T S}}
 {{[INFO] ---}}
 {{[INFO] Running ins14809.Ins14809Test}}

{{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 
2:left, 8:left]}}
 {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 
9:right, 8:right, 6:right]}}

{{# Actual results}}

{{We want to see every number X below end with an entry that says 
[X,left/X,right]}}
 {{but in practice we often see only [X,left/null] meaning the data was not 
joined.}}
 {{This seems to coincide with kafka streams writing...}}
 {{`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment`}}
 {{...to its logs, in spite of the fact that the source message timestamps were 
in order when}}
 {{kafka streams got them.}}

0 [0:left/null, 0:left/0:right]
{{ 1 [1:left/1:right]}}
{{2 [2:left/2:right]}}
{{3 [3:left/null, 3:left/3:right]}}
{{4 [4:left/null, 4:left/4:right]}}
{{5 [5:left/5:right]}}
{{6 [6:left/null, 6:left/6:right]}}
{{7 [7:left/7:right]}}
{{8 [8:left/8:right]}}
{{9 [9:left/9:right] }}

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 
43.267 s - in ins14809.Ins14809Test{{ }}


was (Author: eugendueck):
We are running into similar issues (6.0.1-ccs for broker and kafka-streams 
library, i.e. kafka 2.6.1)
 * lots of "Skipping record for expired segment." warnings in 
AbstractRocksDBSegmentedBytesStore
 * at some point, our topology stops outputting data

As we don't have any re-partitioning in our pipeline, I tried to remove the 
re-keying part from Matthew's code, and as far as I can tell, the problem still 
persists, so it would look like it is not related to re-partitioning. Btw. the 
problem shows even when doing just 10 instead of 1000 messages per topic. Find 
my fork of Matthew's code here: 
 [https://github.com/EugenDueck/ins14809]

This is the output of one such test run:

{{[INFO] ---}}
 {{[INFO] T E S T S}}
 {{[INFO] ---}}
 {{[INFO] Running ins14809.Ins14809Test}}

{{leftStream: [0:left, 3:left, 4:left, 5:left, 1:left, 6:left, 7:left, 9:left, 
2:left, 8:left]}}
 {{rightStream: [5:right, 1:right, 7:right, 2:right, 0:right, 3:right, 4:right, 
9:right, 8:right, 6:right]}}

{{# Actual results}}

{{We want to see every number X below end with an entry that says 
[X,left/X,right]}}
 {{but in practice we often see only [X,left/null] meaning the data was not 
joined.}}
 {{This seems to coincide with kafka streams writing...}}
 {{`WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment`}}
 {{...to its logs, in spite of the fact that the source message timestamps were 
in order when}}
 {{kafka streams got them.}}

{{0 [0:left/null, 0:left/0:right]}}
 {{ 1 [1:left/1:right]}}
 {{ 2 [2:left/2:right]}}
 {{ 3 [3:left/null, 3:left/3:right]}}
 {{ 4 [4:left/null, 4:left/4:right]}}
 {{ 5 [5:left/5:right]}}
 {{ 6 [6:left/null, 6:left/6:right]}}
 {{ 7 [7:left/7:right]}}
 {{ 8 [8:left/8:right]}}
 {{ 9 [9:left/9:right]}}

{{[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 43.267 
s - in ins14809.Ins14809Test}}

 

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 

[GitHub] [kafka] YiDing-Duke commented on a change in pull request #11284: KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC

2021-10-19 Thread GitBox


YiDing-Duke commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r731524967



##
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/Retry.java
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retry encapsulates the mechanism to perform a retry and then exponential
+ * backoff using provided wait times between attempts.
+ *
+ * @param  Result type
+ */
+
+public class Retry {
+
+private static final Logger log = LoggerFactory.getLogger(Retry.class);
+
+private final Time time;
+
+private final long retryBackoffMs;
+
+private final long retryBackoffMaxMs;
+
+public Retry(Time time, long retryBackoffMs, long retryBackoffMaxMs) {
+this.time = time;
+this.retryBackoffMs = retryBackoffMs;
+this.retryBackoffMaxMs = retryBackoffMaxMs;
+
+if (this.retryBackoffMs < 0)
+throw new IllegalArgumentException(String.format("retryBackoffMs 
value %s must be non-negative", retryBackoffMs));
+
+if (this.retryBackoffMaxMs < 0)
+throw new 
IllegalArgumentException(String.format("retryBackoffMaxMs %s value must be 
non-negative", retryBackoffMaxMs));
+
+if (this.retryBackoffMaxMs < this.retryBackoffMs)
+throw new 
IllegalArgumentException(String.format("retryBackoffMaxMs %s is less than 
retryBackoffMs %s", retryBackoffMaxMs, retryBackoffMs));
+}
+
+public R execute(Retryable retryable) throws IOException {
+int currAttempt = 0;
+long end = time.milliseconds() + retryBackoffMaxMs;
+IOException error = null;
+
+while (time.milliseconds() <= end) {
+currAttempt++;
+
+try {
+return retryable.call();
+} catch (IOException e) {
+if (error == null)
+error = e;
+
+long waitMs = retryBackoffMs * (long) Math.pow(2, currAttempt 
- 1);
+long diff = end - time.milliseconds();
+waitMs = Math.min(waitMs, diff);
+
+if (waitMs <= 0)

Review comment:
   When it comes to retry timeout, should we log this error so that we can 
figure out a non retry-able error pattern to add to the list in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org