[GitHub] [kafka] showuon commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…

2022-05-19 Thread GitBox


showuon commented on PR #12167:
URL: https://github.com/apache/kafka/pull/12167#issuecomment-1132408449

   > So I'd like to keep the scope of this PR to unit tests. If you think that 
makes sense, I'll go ahead and open up an issue for it to make sure we track 
that outstanding item.
   
   Yes, make sense. Please open up another JIRA issue for it. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12160: KAFKA-13889: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL

2022-05-19 Thread GitBox


jsancio merged PR #12160:
URL: https://github.com/apache/kafka/pull/12160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-19 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r877600481


##
core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala:
##
@@ -40,9 +42,15 @@ import scala.collection.Map
   * setOffsetsForNextResponse
   */
 class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, 
EpochEndOffset],
- sourceBroker: BrokerEndPoint,
+ override private[server] val 
sourceBroker: BrokerEndPoint,
  time: Time)
-  extends BlockingSend {
+  extends BrokerBlockingSender(sourceBroker = sourceBroker,

Review Comment:
   The AbstractFetcherThread is tied to an endPoint. If there is no endPoint, 
the logic probably won't go through AbstractFetcherThread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13918) Schedule or cancel nooprecord write on metadata version change

2022-05-19 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13918:
--

 Summary: Schedule or cancel nooprecord write on metadata version 
change
 Key: KAFKA-13918
 URL: https://issues.apache.org/jira/browse/KAFKA-13918
 Project: Kafka
  Issue Type: Sub-task
  Components: controller
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-1930) Move server over to new metrics library

2022-05-19 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-1930:
-

Assignee: (was: Aditya Auradkar)

> Move server over to new metrics library
> ---
>
> Key: KAFKA-1930
> URL: https://issues.apache.org/jira/browse/KAFKA-1930
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Priority: Major
>
> We are using org.apache.kafka.common.metrics on the clients, but using Coda 
> Hale metrics on the server. We should move the server over to the new metrics 
> package as well. This will help to make all our metrics self-documenting.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vvcephei commented on a diff in pull request #12186: MINOR: Deflake OptimizedKTableIntegrationTest

2022-05-19 Thread GitBox


vvcephei commented on code in PR #12186:
URL: https://github.com/apache/kafka/pull/12186#discussion_r877582344


##
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws 
Exception {
 // Assert that all messages in the first batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
 
-final ReadOnlyKeyValueStore store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, 
QueryableStoreTypes.keyValueStore());
-final ReadOnlyKeyValueStore store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, 
QueryableStoreTypes.keyValueStore());
-
-final boolean kafkaStreams1WasFirstActive;
-final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-// Assert that the current value in store reflects all messages being 
processed
-if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-kafkaStreams1WasFirstActive = true;
-} else {
-assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-kafkaStreams1WasFirstActive = false;
-}
-
-if (kafkaStreams1WasFirstActive) {
-kafkaStreams1.close();
-} else {
-kafkaStreams2.close();
-}
+final AtomicReference> 
newActiveStore = new AtomicReference<>(null);
+TestUtils.retryOnExceptionWithTimeout(() -> {
+final ReadOnlyKeyValueStore store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, 
QueryableStoreTypes.keyValueStore());
+final ReadOnlyKeyValueStore store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, 
QueryableStoreTypes.keyValueStore());
+
+final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+try {
+// Assert that the current value in store reflects all 
messages being processed
+if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+assertThat(store1.get(key), is(equalTo(batch1NumMessages - 
1)));
+kafkaStreams1.close();
+newActiveStore.set(store2);
+} else {
+assertThat(store2.get(key), is(equalTo(batch1NumMessages - 
1)));
+kafkaStreams2.close();
+newActiveStore.set(store1);
+}
+} catch (final InvalidStateStoreException e) {
+LOG.warn("Detected an unexpected rebalance during test. 
Retrying if possible.", e);
+throw e;

Review Comment:
   This triggers the retryOnException logic



##
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws 
Exception {
 // Assert that all messages in the first batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
 
-final ReadOnlyKeyValueStore store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, 
QueryableStoreTypes.keyValueStore());
-final ReadOnlyKeyValueStore store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, 
QueryableStoreTypes.keyValueStore());
-
-final boolean kafkaStreams1WasFirstActive;
-final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-// Assert that the current value in store reflects all messages being 
processed
-if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
-assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
-kafkaStreams1WasFirstActive = true;
-} else {
-assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
-kafkaStreams1WasFirstActive = false;
-}
-
-if (kafkaStreams1WasFirstActive) {
-kafkaStreams1.close();
-} else {
-kafkaStreams2.close();
-}
+final AtomicReference> 
newActiveStore = new AtomicReference<>(null);
+TestUtils.retryOnExceptionWithTimeout(() -> {
+final ReadOnlyKeyValueStore store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, 
QueryableStoreTypes.keyValueStore());
+final ReadOnlyKeyValueStore store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, 
QueryableStoreTypes.keyValueStore());

Review Comment:
   Depending on where and when the rebalance 

[GitHub] [kafka] vvcephei opened a new pull request, #12186: MINOR: Deflake OptimizedKTableIntegrationTest

2022-05-19 Thread GitBox


vvcephei opened a new pull request, #12186:
URL: https://github.com/apache/kafka/pull/12186

   This test has been flaky due to unexpected rebalances during the test.
   This change fixes it by detecting an unexpected rebalance and retrying
   the test logic (within a timeout).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-19 Thread GitBox


rittikaadhikari commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r877577941


##
core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala:
##
@@ -40,9 +42,15 @@ import scala.collection.Map
   * setOffsetsForNextResponse
   */
 class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, 
EpochEndOffset],
- sourceBroker: BrokerEndPoint,
+ override private[server] val 
sourceBroker: BrokerEndPoint,
  time: Time)
-  extends BlockingSend {
+  extends BrokerBlockingSender(sourceBroker = sourceBroker,

Review Comment:
   Would there ever be a scenario in which we wouldn't want `brokerEndPoint` to 
be a part of the `BlockingSend` trait? I.e., is there ever an implementation of 
`BlockingSend` in which a `brokerEndPoint` would be extraneous?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access

2022-05-19 Thread GitBox


junrao commented on code in PR #12005:
URL: https://github.com/apache/kafka/pull/12005#discussion_r877559859


##
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api.Request
+import kafka.cluster.BrokerEndPoint
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.message.FetchResponseData
+import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
RequestUtils}
+
+import java.util
+import java.util.Optional
+import scala.collection.{Map, Seq, Set, mutable}
+import scala.compat.java8.OptionConverters.RichOptionForJava8
+import scala.jdk.CollectionConverters._
+
+/**
+ * Facilitates fetches from a local replica leader.
+ *
+ * @param brokerEndPoint The broker (host:port) that we want to connect to

Review Comment:
   brokerEndPoint => sourceBroker



##
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.BrokerEndPoint
+
+import java.util.{Collections, Optional}
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import kafka.utils.Implicits.MapExtensionMethods
+import kafka.utils.Logging
+import org.apache.kafka.clients.FetchSessionHandler
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic,
 OffsetForLeaderTopicCollection}
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, 
OffsetsForLeaderEpochResponse}
+import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
+
+import scala.jdk.CollectionConverters._
+import scala.collection.{Map, mutable}
+import scala.compat.java8.OptionConverters.RichOptionForJava8
+
+/**
+ * Facilitates fetches from a remote replica leader.
+ *
+ * @param logPrefix The log prefix
+ * @param endpoint The raw leader endpoint used to communicate with the leader
+ * @param fetchSessionHandler A FetchSessionHandler to track the partitions in 
the session
+ * @param brokerConfig Broker configuration
+ * @param replicaMgr A ReplicaManager
+ * @param quota The quota, used when building a fetch request
+ */
+class RemoteLeaderEndPoint(logPrefix: String,
+   endpoint: BrokerBlockingSender,

Review Comment:
   Should endpoint be named blockingSender?



##
core/src/main/scala/kafka/server/LeaderEndPoint.scala:
##
@@ -0,0 +1,109 @@
+/*
+ * 

[jira] [Commented] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539801#comment-17539801
 ] 

François Rosière commented on KAFKA-13913:
--

[~mjsax] , thanks for your comment. The two referenced Jira issues were about 
adding builders for configs (didn't find anything clear about the push back) 
while here, the idea would be to have builders on the producers, consumers and 
kafka streams.

Previous proposal made in KIP-832 was about adding more constructors to 
directly expose the config object but builders have been proposed and looks 
more user friendly. 

These builders are only exposing meaningful methods for the users taking care 
of type safety, auto completion, overrides, etc. It's also a straightforward 
way to support injection of already configured dependencies such as the 
interceptors, serializers/deserializers. Simple configurations and complex 
configurations could still be provided using the current approach.

So, don't see any reasons to not progress in that direction.

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji opened a new pull request, #12185: MINOR: Fix buildResponseSend test cases for envelope responses

2022-05-19 Thread GitBox


hachikuji opened a new pull request, #12185:
URL: https://github.com/apache/kafka/pull/12185

   The test cases we have in `RequestChannelTest` for `buildResponseSend` 
construct the envelope request incorrectly. Basically they confuse the envelope 
context and the reference to the wrapped envelope request object. This patch 
fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request is built 
properly. It also consolidates the tests in `RequestChannelTest` to avoid 
duplication.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704

2022-05-19 Thread GitBox


cmccabe commented on PR #12182:
URL: https://github.com/apache/kafka/pull/12182#issuecomment-1132189652

   Thanks for the PR, @mumrah .
   
   1. I agree that tagged fields definitely count as new metadata.  
`MetadataVersion.IBP_3_2_IV0` is currently marked with `didMetadataChange = 
false`. If this metadata version did add new metadata (sounds like it did?), 
this needs to be marked as `true`. Also, we should probably add a very short 
description of the new fields added in each version, in the comment for the 
appropriate MetadataVersion. (i.e. "added X field to Y record"). It's good to 
set this pattern early so others can follow...
   
   2. I would prefer to embed the `MetadataVersion` in each `*Image` 
(`AclsImage`, `ClusterImage`, etc.) rather than passing it into `write` and 
other functions. I think it's fundamental to how we interpret the data -- there 
may be fields that we have to treat slightly differently based on the version 
(although we want to minimize this as much as we possibly can, of course). This 
also lets us have invariants in the constructors like AclsImage objects at MV X 
never have field Y, and so on, which may be nice for unit tests.
   
   3. I think rather than having `MetadataVersionSupplier`, we should just pass 
`FeatureControlManager` to the constructors of the other Control Managers. This 
will be useful in the future when we have other feature flags that we want to 
check. In general I don't see a big benefit to extracting an interface here -- 
`FeatureControlManager` is very lightweight to create and if you want one for 
testing, you can just create one. We do this for a lot of other things -- it 
works very well when you have an object A and B and A makes a lot of calls into 
B, but B makes none (or very few) into A. This also makes it easier to 
understand the control flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13595) Allow producing records with null values in Kafka Console Producer

2022-05-19 Thread Ryan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539764#comment-17539764
 ] 

Ryan commented on KAFKA-13595:
--

Duplicates https://issues.apache.org/jira/browse/KAFKA-10238

> Allow producing records with null values in Kafka Console Producer
> --
>
> Key: KAFKA-13595
> URL: https://issues.apache.org/jira/browse/KAFKA-13595
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP-810: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-810%3A+Allow+producing+records+with+null+values+in+Kafka+Console+Producer



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vamossagar12 commented on pull request #12121: KAFKA-13846: Adding overloaded metricOrElseCreate method

2022-05-19 Thread GitBox


vamossagar12 commented on PR #12121:
URL: https://github.com/apache/kafka/pull/12121#issuecomment-1132048815

   > > From a public API point of view, Metrics is in a gray area. It is not 
officially part of our public API however we have a few interfaces leaking it. 
That being said, we should be careful with the changes that we do here.
   > 
   > I agree with @dajac as well, what I was thinking is that we can modify the 
private `registerMetric`, and I'm neutral about whether adding a public 
`metricOrElseCreate` should require a KIP. If people think this is necessary, 
we could create one.
   
   Sure thing... Even I am neutral about KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12121: KAFKA-13846: Adding overloaded metricOrElseCreate method

2022-05-19 Thread GitBox


vamossagar12 commented on code in PR #12121:
URL: https://github.com/apache/kafka/pull/12121#discussion_r877394625


##
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java:
##
@@ -563,10 +615,15 @@ public synchronized void removeReporter(MetricsReporter 
reporter) {
 }
 }
 
-synchronized void registerMetric(KafkaMetric metric) {
+synchronized void registerMetric(KafkaMetric metric, boolean 
raiseIfMetricExists) {

Review Comment:
   @guozhangwang i think yeah that could be done. I have made the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539718#comment-17539718
 ] 

Matthias J. Sax commented on KAFKA-13913:
-

There was some discussion about this in the past (cf KAFKA-3943 and KAFKA-4436).

There was also some push back on this idea. Might be good to revisit those 
arguments. (Not sure if it's contained in the Jira, or PR, or maybe mailing 
list.)

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13913:

Labels: kip  (was: )

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13913:

Component/s: streams

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya opened a new pull request, #12184: KAFKA-13911: Fix the rate window size calculation for edge cases

2022-05-19 Thread GitBox


divijvaidya opened a new pull request, #12184:
URL: https://github.com/apache/kafka/pull/12184

   ## Problem
   Implementation of connection creation rate quotas in Kafka is dependent on 
two configurations:
   
[quota.window.num](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.num)
 AND 
[quota.window.size.seconds](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.size.seconds)
   
   The minimum possible values of these configuration is 1 as per the 
documentation. However, when we set 1 as the configuration value, we can hit a 
situation where rate is calculated as NaN (and hence, leads to exceptions). 
This specific scenario occurs when an event is recorded at the start of a 
sample window.
   
   ## Solution
   This patch fixes this edge case by ensuring that the windowSize over which 
Rate is calculated is at least 1ms (even if it is calculated at the start of 
the sample window).
   
   ## Test
   Added a unit test which fails before the patch and passes after the patch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #12183: KAFKA-13883: Implement NoOpRecord and metadata metrics

2022-05-19 Thread GitBox


jsancio opened a new pull request, #12183:
URL: https://github.com/apache/kafka/pull/12183

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13863) Prevent null config value when create topic in KRaft mode

2022-05-19 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13863.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Prevent null config value when create topic in KRaft mode
> -
>
> Key: KAFKA-13863
> URL: https://issues.apache.org/jira/browse/KAFKA-13863
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Major
> Fix For: 3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] hachikuji merged pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode

2022-05-19 Thread GitBox


hachikuji merged PR #12109:
URL: https://github.com/apache/kafka/pull/12109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #12159: [WIP] Fix stuck SSL tests in case of authentication failure

2022-05-19 Thread GitBox


fvaleri commented on PR #12159:
URL: https://github.com/apache/kafka/pull/12159#issuecomment-1131936314

   @divijvaidya I fixed the test you were referring to.
   
   I also fixed the help method `sendNoReceive` which was directly using 
`channel.mute()` instead of `selector.mute(channel.id())`. The first call does 
not ensure proper state handling, and I believe it's the reason why 
`testCloseOldestConnectionWithMultiplePendingReceives` was randomly failing.
   
   Let's see if the test job pass this time.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704

2022-05-19 Thread GitBox


mumrah commented on PR #12182:
URL: https://github.com/apache/kafka/pull/12182#issuecomment-1131934558

   FeaturesImage has `metadataVersion()` method which we can make accessible 
through a supplier for the Image/Delta classes. For broker components like 
ReplicaManager, we can add an argument to applyDelta that indicates a 
MetadataVersion change. I had some code like this in the [original 
PR](https://github.com/apache/kafka/pull/11677/commits/83bbf3b21bb0d9f313aa251cf011ec35748876e2#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2110),
 but we removed it since we didn't have any use cases yet.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704

2022-05-19 Thread GitBox


mumrah commented on code in PR #12182:
URL: https://github.com/apache/kafka/pull/12182#discussion_r877220561


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1010,6 +1004,11 @@ private void maybeCompleteAuthorizerInitialLoad() {
 }
 }
 
+// Visible for testing
+MetadataVersion currentMetadataVersion() {
+return featureControl.metadataVersion();
+}

Review Comment:
   Oops, that comment was copied over from another PR #12177  :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704

2022-05-19 Thread GitBox


jsancio commented on code in PR #12182:
URL: https://github.com/apache/kafka/pull/12182#discussion_r877200466


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1010,6 +1004,11 @@ private void maybeCompleteAuthorizerInitialLoad() {
 }
 }
 
+// Visible for testing
+MetadataVersion currentMetadataVersion() {
+return featureControl.metadataVersion();
+}

Review Comment:
   Which test needs this? It doesn't look like this is used in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #12182: MINOR: Use dynamic metadata.version check for KIP-704

2022-05-19 Thread GitBox


mumrah opened a new pull request, #12182:
URL: https://github.com/apache/kafka/pull/12182

   Now that `metadata.version` has been integrated with the controller #12050, 
we need to make use of the dynamic nature of the feature flag. This patch adds 
a supplier that is passed down to ReplicationControlManager so that the leader 
recovery feature can be toggled on/off.
   
   IBP/MetadataVersion 3.2 added the `LeaderRecoveryState` tagged field to 
PartitionRecord. If we are downgrading from 3.2 to 3.0, the downgrade of the 
feature flag will cause the active controller to stop populating this field, 
but the value will still be present in existing records. However, since this is 
a tagged field, it won't cause any compatibility problems when we actually 
downgrade the server binaries (the tagged field will just be ignored).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13807) Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs

2022-05-19 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13807.
--
Resolution: Fixed

> Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs
> -
>
> Key: KAFKA-13807
> URL: https://issues.apache.org/jira/browse/KAFKA-13807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13807) Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs

2022-05-19 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-13807:


Assignee: Colin McCabe

> Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs
> -
>
> Key: KAFKA-13807
> URL: https://issues.apache.org/jira/browse/KAFKA-13807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] Moovlin commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…

2022-05-19 Thread GitBox


Moovlin commented on PR #12167:
URL: https://github.com/apache/kafka/pull/12167#issuecomment-1131755255

   Thanks for the quick responses. To your first answer, I'm happy to do that 
and will take a look at the TopciCommandTest for guidance. 
   
   To your second answer. Integration tests for this should probably be in a 
different Jira since the methods in the MockAdminClient are effectively not 
implemented (if you try to do anything other than delete from the first offset, 
it throws an UnsupportedOperationException). So I'd like to keep the scope of 
this PR to unit tests. If you think that makes sense, I'll go ahead and open up 
an issue for it to make sure we track that outstanding item. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)

2022-05-19 Thread GitBox


dajac opened a new pull request, #12181:
URL: https://github.com/apache/kafka/pull/12181

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi opened a new pull request, #12180: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop

2022-05-19 Thread GitBox


viktorsomogyi opened a new pull request, #12180:
URL: https://github.com/apache/kafka/pull/12180

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop

2022-05-19 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-13917:

Description: 
Currently the heartbeat thread's lookupCoordinator() is called in a tight loop 
if brokers crash and the consumer is left running. Besides that it floods the 
logs on debug level, it increases CPU usage as well.

The fix is easy, just need to put a backoff call after coordinator lookup.

Reproduction:
# Start a few brokers
# Create a topic and produce to it
# Start consuming
# Stop all brokers
At this point lookupCoordinator() will be called in a tight loop.


  was:
Currently the heartbeat thread's lookupCoordinator() is called in a tight loop 
if brokers crash and the consumer is left running. Besides that it floods the 
logs on debug level, it increases CPU usage as well.

The fix is easy, just need to put a backoff call after coordinator lookup.


> Avoid calling lookupCoordinator() in tight loop
> ---
>
> Key: KAFKA-13917
> URL: https://issues.apache.org/jira/browse/KAFKA-13917
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.1.0, 3.1.1, 3.1.2
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Currently the heartbeat thread's lookupCoordinator() is called in a tight 
> loop if brokers crash and the consumer is left running. Besides that it 
> floods the logs on debug level, it increases CPU usage as well.
> The fix is easy, just need to put a backoff call after coordinator lookup.
> Reproduction:
> # Start a few brokers
> # Create a topic and produce to it
> # Start consuming
> # Stop all brokers
> At this point lookupCoordinator() will be called in a tight loop.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop

2022-05-19 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-13917:
---

 Summary: Avoid calling lookupCoordinator() in tight loop
 Key: KAFKA-13917
 URL: https://issues.apache.org/jira/browse/KAFKA-13917
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 3.1.1, 3.1.0, 3.1.2
Reporter: Viktor Somogyi-Vass
Assignee: Viktor Somogyi-Vass


Currently the heartbeat thread's lookupCoordinator() is called in a tight loop 
if brokers crash and the consumer is left running. Besides that it floods the 
logs on debug level, it increases CPU usage as well.

The fix is easy, just need to put a backoff call after coordinator lookup.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13916) Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)

2022-05-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13916:

Description: KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft

> Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)
> 
>
> Key: KAFKA-13916
> URL: https://issues.apache.org/jira/browse/KAFKA-13916
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13916) Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)

2022-05-19 Thread David Jacot (Jira)
David Jacot created KAFKA-13916:
---

 Summary: Fenced replicas should not be allowed to join the ISR in 
KRaft (KIP-841)
 Key: KAFKA-13916
 URL: https://issues.apache.org/jira/browse/KAFKA-13916
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac commented on a diff in pull request #12065: KAFKA-13788: Use AdminClient.incrementalAlterConfigs in ConfigCommand

2022-05-19 Thread GitBox


dajac commented on code in PR #12065:
URL: https://github.com/apache/kafka/pull/12065#discussion_r876706874


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -367,15 +366,12 @@ object ConfigCommand extends Logging {
 if (invalidConfigs.nonEmpty)
   throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
 
-val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-val sensitiveEntries = newEntries.filter(_._2.value == null)
-if (sensitiveEntries.nonEmpty)
-  throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-val newConfig = new JConfig(newEntries.asJava.values)
-
 val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
 val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
-adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))

Review Comment:
   All this code looks pretty similar to the other branches now. Do you think 
that we could refactor and share more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-05-19 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539465#comment-17539465
 ] 

lqjacklee commented on KAFKA-13888:
---

[~Niket Goel] for the field ‘LastCaughtUpTimestamp’ , how can i compute it. 

the field ‘LastFetchTimestamp’ I can fetch from the 
leaderState.getReplicatedLastFetchTimestamp(leaderState.localId());

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: Niket Goel
>Priority: Major
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mimaison commented on pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-05-19 Thread GitBox


mimaison commented on PR #11780:
URL: https://github.com/apache/kafka/pull/11780#issuecomment-1131508676

   Thanks for the updates @C0urante ! I've on PTO next week, I'll take a look 
at the tests when I'm back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-05-19 Thread GitBox


mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r876878064


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() {
 return workerMetricsGroup;
 }
 
+abstract class TaskBuilder {
+
+private final ConnectorTaskId id;
+private final ClusterConfigState configState;
+private final TaskStatus.Listener statusListener;
+private final TargetState initialState;
+
+private Task task = null;
+private ConnectorConfig connectorConfig = null;
+private Converter keyConverter = null;
+private Converter valueConverter = null;
+private HeaderConverter headerConverter = null;
+private ClassLoader classLoader = null;
+
+public TaskBuilder(ConnectorTaskId id,
+   ClusterConfigState configState,
+   TaskStatus.Listener statusListener,
+   TargetState initialState) {
+this.id = id;
+this.configState = configState;
+this.statusListener = statusListener;
+this.initialState = initialState;
+}
+
+public TaskBuilder withTask(Task task) {
+this.task = task;
+return this;
+}
+
+public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+this.connectorConfig = connectorConfig;
+return this;
+}
+
+public TaskBuilder withKeyConverter(Converter keyConverter) {
+this.keyConverter = keyConverter;
+return this;
+}
+
+public TaskBuilder withValueConverter(Converter valueConverter) {
+this.valueConverter = valueConverter;
+return this;
+}
+
+public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+this.headerConverter = headerConverter;
+return this;
+}
+
+public TaskBuilder withClassloader(ClassLoader classLoader) {
+this.classLoader = classLoader;
+return this;
+}
+
+public WorkerTask build() {
+Objects.requireNonNull(task, "Task cannot be null");
+Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+final Class connectorClass = 
plugins.connectorClass(
+
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+return doBuild(task, id, configState, statusListener, initialState,
+connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+}
+
+abstract WorkerTask doBuild(Task task,
+ConnectorTaskId id,
+ClusterConfigState configState,
+TaskStatus.Listener statusListener,
+TargetState initialState,
+ConnectorConfig connectorConfig,
+Converter keyConverter,
+Converter valueConverter,
+HeaderConverter headerConverter,
+ClassLoader classLoader,
+ErrorHandlingMetrics errorHandlingMetrics,
+Class connectorClass,
+RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+}
+
+class SinkTaskBuilder extends TaskBuilder {
+public SinkTaskBuilder(ConnectorTaskId id,
+   ClusterConfigState configState,
+   TaskStatus.Listener statusListener,
+   TargetState initialState) {
+super(id, configState, statusListener, initialState);
+}
+
+@Override
+public WorkerTask doBuild(Task task,
+  

[GitHub] [kafka] mimaison commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)

2022-05-19 Thread GitBox


mimaison commented on code in PR #11780:
URL: https://github.com/apache/kafka/pull/11780#discussion_r876877095


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -576,88 +672,42 @@ public boolean startTask(
 
 executor.submit(workerTask);
 if (workerTask instanceof WorkerSourceTask) {
-sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) 
workerTask);
+sourceTaskOffsetCommitter.ifPresent(committer -> 
committer.schedule(id, (WorkerSourceTask) workerTask));
 }
 return true;
 }
 }
 
-private WorkerTask buildWorkerTask(ClusterConfigState configState,
-   ConnectorConfig connConfig,
-   ConnectorTaskId id,
-   Task task,
-   TaskStatus.Listener statusListener,
-   TargetState initialState,
-   Converter keyConverter,
-   Converter valueConverter,
-   HeaderConverter headerConverter,
-   ClassLoader loader) {
-ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
-final Class connectorClass = 
plugins.connectorClass(
-connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
-connConfig.errorMaxDelayInMillis(), 
connConfig.errorToleranceType(), Time.SYSTEM);
-retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-// Decide which type of worker task we need based on the type of task.
-if (task instanceof SourceTask) {
-SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
-connConfig.originalsStrings(), 
config.topicCreationEnable());
-retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-TransformationChain transformationChain = new 
TransformationChain<>(sourceConfig.transformations(), 
retryWithToleranceOperator);
-log.info("Initializing: {}", transformationChain);
-CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-internalKeyConverter, internalValueConverter);
-OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetBackingStore, id.connector(),
-internalKeyConverter, internalValueConverter);
-Map producerProps = producerConfigs(id, 
"connector-producer-" + id, config, sourceConfig, connectorClass,
-
connectorClientConfigOverridePolicy, kafkaClusterId);
-KafkaProducer producer = new 
KafkaProducer<>(producerProps);
-TopicAdmin admin;
-Map topicCreationGroups;
-if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
-Map adminProps = adminConfigs(id, 
"connector-adminclient-" + id, config,
-sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId);
-admin = new TopicAdmin(adminProps);
-topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
-} else {
-admin = null;
-topicCreationGroups = null;
-}
-
-// Note we pass the configState as it performs dynamic 
transformations under the covers
-return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
-offsetReader, offsetWriter, config, configState, metrics, 
loader, time, retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
-} else if (task instanceof SinkTask) {
-TransformationChain transformationChain = new 
TransformationChain<>(connConfig.transformations(), 
retryWithToleranceOperator);
-log.info("Initializing: {}", transformationChain);
-SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connConfig.originalsStrings());
-retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
-WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
-keyConverter, valueConverter, headerConverter);
-
-Map consumerProps = consumerConfigs(id, config, 
connConfig, 

[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2022-05-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539447#comment-17539447
 ] 

ASF GitHub Bot commented on KAFKA-13882:


mimaison commented on code in PR #410:
URL: https://github.com/apache/kafka-site/pull/410#discussion_r876875276


##
.htaccess:
##
@@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2]
 RewriteRule ^/?(\d+)/images/ - [S=1]
 RewriteCond $2 !=protocol
 RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE]
+RewriteCond %{REQUEST_FILENAME}.html -f
+RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html

Review Comment:
   I find it strange we have to edit this file and I wonder how it works on the 
website. I'll try to find out





> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact

2022-05-19 Thread Peter James Pringle (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter James Pringle updated KAFKA-13915:

Description: 
Add sanity validation on streams start up that *repartition* topics are not 
setup with *cleanup.policy* of {*}compact{*}.

In enterprise envs automated creation of kafka streams intermediate topics is 
not always possible due to policy restrictions and as a result it is done 
manually which  is prone to user misconfiguration.

In several cases we have found the repartition topics have been incorrectly 
setup following the changelog topic setup with compact enabled. The result 
being that a compacted repartition topic will result in data loss if more that 
one value is mapped to the new key. This has been noticed where aggregate 
follows a repartition topic and the aggregated value is incorrect.

 

Example:

 

{{Original data: (coffee, drink), (tea, drink), (beer, drink)}}

 

Repartition by type i.e. drink:

 

Expected:

{{(drink, coffee), (drink, tea), (drink, beer)}}

 

With compaction the following is possible:

 

Actual

{{(drink, beer);}}

coffee and tea are lost.

  was:
Add sanity validation on streams start up that *repartition* topics are not 
setup with *cleanup.policy* of {*}compact{*}.

In enterprise envs automated creation of kafka streams intermediate topics is 
not always possible due to policy restrictions and as a result it is done 
manually which  is prone to user misconfiguration.

In several cases we have found the repartition topics have been incorrectly 
setup following the changelog topic setup with compact enabled. The result 
being that a compacted repartition topic will result in data loss if more that 
one value is mapped to the new key. This has been noticed where aggregate 
follows a repartition topic and the aggregated value is incorrect.

 

Example:

 

{{Original data: (coffee, drink), (tea, drink), (beer, drink)}}

 

Repartition by type i.e. drink:

 

Expected:

{{(drink, coffee), (drink, tea), (drink, beer)}}

 

With compaction the following is possible:

 

Actual

{\{(drink, beer); }}

coffee and tea are lost.


> Kafka streams should validate that the repartition topics are not created 
> with cleanup.policy compact
> -
>
> Key: KAFKA-13915
> URL: https://issues.apache.org/jira/browse/KAFKA-13915
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Peter James Pringle
>Priority: Major
>
> Add sanity validation on streams start up that *repartition* topics are not 
> setup with *cleanup.policy* of {*}compact{*}.
> In enterprise envs automated creation of kafka streams intermediate topics is 
> not always possible due to policy restrictions and as a result it is done 
> manually which  is prone to user misconfiguration.
> In several cases we have found the repartition topics have been incorrectly 
> setup following the changelog topic setup with compact enabled. The result 
> being that a compacted repartition topic will result in data loss if more 
> that one value is mapped to the new key. This has been noticed where 
> aggregate follows a repartition topic and the aggregated value is incorrect.
>  
> Example:
>  
> {{Original data: (coffee, drink), (tea, drink), (beer, drink)}}
>  
> Repartition by type i.e. drink:
>  
> Expected:
> {{(drink, coffee), (drink, tea), (drink, beer)}}
>  
> With compaction the following is possible:
>  
> Actual
> {{(drink, beer);}}
> coffee and tea are lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact

2022-05-19 Thread Peter James Pringle (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter James Pringle updated KAFKA-13915:

Description: 
Add sanity validation on streams start up that *repartition* topics are not 
setup with *cleanup.policy* of {*}compact{*}.

In enterprise envs automated creation of kafka streams intermediate topics is 
not always possible due to policy restrictions and as a result it is done 
manually which  is prone to user misconfiguration.

In several cases we have found the repartition topics have been incorrectly 
setup following the changelog topic setup with compact enabled. The result 
being that a compacted repartition topic will result in data loss if more that 
one value is mapped to the new key. This has been noticed where aggregate 
follows a repartition topic and the aggregated value is incorrect.

 

Example:

 

{{Original data: (coffee, drink), (tea, drink), (beer, drink)}}

 

Repartition by type i.e. drink:

 

Expected:

{{(drink, coffee), (drink, tea), (drink, beer)}}

 

With compaction the following is possible:

 

Actual

{\{(drink, beer); }}

coffee and tea are lost.

  was:
Add sanity validation on streams start up that *repartition* topics are not 
setup with *cleanup.policy* of {*}compact{*}.

In enterprise envs automated creation of kafka streams intermediate topics is 
not always possible due to policy restrictions and as a result it is done 
manually which  is prone to user misconfiguration.

In several cases we have found the repartition topics have been incorrectly 
setup following the changelog topic setup with compact enabled. The result 
being that a compacted repartition topic will result in data loss if more that 
one value is mapped to the new key.

 

Example:

 

{{Original data: (coffee, drink), (tea, drink), (beer, drink)}}

 

Repartition by type i.e. drink:

 

Expected:

{{(drink, coffee), (drink, tea), (drink, beer)}}

 

With compaction the following is possible:

 

Actual

{{(drink, beer); }}

coffee and tea are lost.


> Kafka streams should validate that the repartition topics are not created 
> with cleanup.policy compact
> -
>
> Key: KAFKA-13915
> URL: https://issues.apache.org/jira/browse/KAFKA-13915
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Peter James Pringle
>Priority: Major
>
> Add sanity validation on streams start up that *repartition* topics are not 
> setup with *cleanup.policy* of {*}compact{*}.
> In enterprise envs automated creation of kafka streams intermediate topics is 
> not always possible due to policy restrictions and as a result it is done 
> manually which  is prone to user misconfiguration.
> In several cases we have found the repartition topics have been incorrectly 
> setup following the changelog topic setup with compact enabled. The result 
> being that a compacted repartition topic will result in data loss if more 
> that one value is mapped to the new key. This has been noticed where 
> aggregate follows a repartition topic and the aggregated value is incorrect.
>  
> Example:
>  
> {{Original data: (coffee, drink), (tea, drink), (beer, drink)}}
>  
> Repartition by type i.e. drink:
>  
> Expected:
> {{(drink, coffee), (drink, tea), (drink, beer)}}
>  
> With compaction the following is possible:
>  
> Actual
> {\{(drink, beer); }}
> coffee and tea are lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact

2022-05-19 Thread Peter James Pringle (Jira)
Peter James Pringle created KAFKA-13915:
---

 Summary: Kafka streams should validate that the repartition topics 
are not created with cleanup.policy compact
 Key: KAFKA-13915
 URL: https://issues.apache.org/jira/browse/KAFKA-13915
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.8.1
Reporter: Peter James Pringle


Add sanity validation on streams start up that *repartition* topics are not 
setup with *cleanup.policy* of {*}compact{*}.

In enterprise envs automated creation of kafka streams intermediate topics is 
not always possible due to policy restrictions and as a result it is done 
manually which  is prone to user misconfiguration.

In several cases we have found the repartition topics have been incorrectly 
setup following the changelog topic setup with compact enabled. The result 
being that a compacted repartition topic will result in data loss if more that 
one value is mapped to the new key.

 

Example:

 

{{Original data: (coffee, drink), (tea, drink), (beer, drink)}}

 

Repartition by type i.e. drink:

 

Expected:

{{(drink, coffee), (drink, tea), (drink, beer)}}

 

With compaction the following is possible:

 

Actual

{{(drink, beer); }}

coffee and tea are lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-19 Thread GitBox


showuon commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r876740554


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -830,7 +830,13 @@ class KafkaServer(
   private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
 for (logDir <- config.logDirs if logManager.isLogDirOnline(new 
File(logDir).getAbsolutePath)) {
   val checkpoint = brokerMetadataCheckpoints(logDir)
-  checkpoint.write(brokerMetadata.toProperties)
+  try {
+checkpoint.write(brokerMetadata.toProperties)
+  } catch {
+  case e: IOException =>
+val dirPath = checkpoint.file.getAbsolutePath
+logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while 
writing meta.properties to $dirPath", e)

Review Comment:
   handle IOException during writing meta.properties when broker startup, to 
avoid it gracefully shutdown the broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-19 Thread GitBox


showuon commented on PR #12136:
URL: https://github.com/apache/kafka/pull/12136#issuecomment-1131379581

   @junrao , thanks for your review. I've addressed your comments. Also, I 
found we should handle `IOException` during writing `meta.properties` in 
server.startup. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-19 Thread GitBox


showuon commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r876740835


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File],
 s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
 } catch {
   case e: IOException =>
-offlineDirs.add((logDirAbsolutePath, e))
-error(s"Error while loading log dir $logDirAbsolutePath", e)
+handleIOException(logDirAbsolutePath, e)
+  case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
+// KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
+handleIOException(logDirAbsolutePath, 
e.getCause.asInstanceOf[IOException])

Review Comment:
   Good catch! Updated.



##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -158,22 +197,54 @@ class LogLoaderTest {
 }
 
 locally {
-  simulateError.hasError = true
-  val logManager: LogManager = interceptedLogManager(logConfig, logDirs, 
simulateError)
-  log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = 
None)
+  val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+  val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, 
logDirFailureChannel)
 
-  // Simulate error
-  assertThrows(classOf[RuntimeException], () => {
-val defaultConfig = logManager.currentDefaultConfig
-logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-  })
+  // Simulate Runtime error
+  assertThrows(classOf[RuntimeException], runLoadLogs)
   assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not 
have existed")
+  
assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log 
dir should not turn offline when Runtime Exception thrown")
+
   // Do not simulate error on next call to LogManager#loadLogs. LogManager 
must understand that log had unclean shutdown the last time.
   simulateError.hasError = false
   cleanShutdownInterceptedValue = true
   val defaultConfig = logManager.currentDefaultConfig
   logManager.loadLogs(defaultConfig, 
logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
   assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean 
shutdown flag")
+  logManager.shutdown()
+}
+
+locally {
+  val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+  val (logManager, runLoadLogs) = 
initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, 
logDirFailureChannel)
+
+  // Simulate IO error
+  assertDoesNotThrow(runLoadLogs, "IOException should be caught and 
handled")
+
+  
assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the 
log dir should turn offline after IOException thrown")
+  logManager.shutdown()
+}
+
+locally {

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

2022-05-19 Thread GitBox


showuon commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r876740554


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -830,7 +830,13 @@ class KafkaServer(
   private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
 for (logDir <- config.logDirs if logManager.isLogDirOnline(new 
File(logDir).getAbsolutePath)) {
   val checkpoint = brokerMetadataCheckpoints(logDir)
-  checkpoint.write(brokerMetadata.toProperties)
+  try {
+checkpoint.write(brokerMetadata.toProperties)
+  } catch {
+  case e: IOException =>
+val dirPath = checkpoint.file.getAbsolutePath
+logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while 
writing meta.properties to $dirPath", e)

Review Comment:
   handle IOException during writing meta.properties, to avoid it gracefully 
shutdown the broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12065: KAFKA-13788: Use AdminClient.incrementalAlterConfigs in ConfigCommand

2022-05-19 Thread GitBox


dajac commented on code in PR #12065:
URL: https://github.com/apache/kafka/pull/12065#discussion_r876706874


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -367,15 +366,12 @@ object ConfigCommand extends Logging {
 if (invalidConfigs.nonEmpty)
   throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
 
-val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-val sensitiveEntries = newEntries.filter(_._2.value == null)
-if (sensitiveEntries.nonEmpty)
-  throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-val newConfig = new JConfig(newEntries.asJava.values)
-
 val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
 val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
-adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))

Review Comment:
   All this code looks pretty similar to the others now. Do you think that we 
could refactor and share more?



##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -367,15 +366,12 @@ object ConfigCommand extends Logging {
 if (invalidConfigs.nonEmpty)
   throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
 
-val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-val sensitiveEntries = newEntries.filter(_._2.value == null)
-if (sensitiveEntries.nonEmpty)
-  throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-val newConfig = new JConfig(newEntries.asJava.values)
-
 val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
 val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
-adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+  ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+  ).asJavaCollection

Review Comment:
   nit: We usually don't use curly braces for one-liners. The closing 
parenthesis (before `asJavaCollection` does not seem to be indented correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] acsaki opened a new pull request, #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…

2022-05-19 Thread GitBox


acsaki opened a new pull request, #12179:
URL: https://github.com/apache/kafka/pull/12179

   
   
   Clients remain connected and able to produce or consume despite an expired 
OAUTHBEARER token.
   
   The problem can be reproduced using the 
https://github.com/acsaki/kafka-sasl-reauth project by starting the embedded 
OAuth2 server and Kafka, then running the long running consumer in 
OAuthBearerTest and then killing the OAuth2 server thus making the client 
unable to re-authenticate.
   
   Root cause seems to be 
SaslServerAuthenticator#calcCompletionTimesAndReturnSessionLifetimeMs failing 
to set ReauthInfo#sessionExpirationTimeNanos when tokens have already expired 
(when session life time goes negative), in turn causing 
KafkaChannel#serverAuthenticationSessionExpired returning false and finally 
SocketServer not closing the channel.
   
   The issue is observed with OAUTHBEARER but seems to have a wider impact on 
SASL re-authentication.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org