[GitHub] [kafka] showuon commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…
showuon commented on PR #12167: URL: https://github.com/apache/kafka/pull/12167#issuecomment-1132408449 > So I'd like to keep the scope of this PR to unit tests. If you think that makes sense, I'll go ahead and open up an issue for it to make sure we track that outstanding item. Yes, make sense. Please open up another JIRA issue for it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12160: KAFKA-13889: Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
jsancio merged PR #12160: URL: https://github.com/apache/kafka/pull/12160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r877600481 ## core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala: ## @@ -40,9 +42,15 @@ import scala.collection.Map * setOffsetsForNextResponse */ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], - sourceBroker: BrokerEndPoint, + override private[server] val sourceBroker: BrokerEndPoint, time: Time) - extends BlockingSend { + extends BrokerBlockingSender(sourceBroker = sourceBroker, Review Comment: The AbstractFetcherThread is tied to an endPoint. If there is no endPoint, the logic probably won't go through AbstractFetcherThread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13918) Schedule or cancel nooprecord write on metadata version change
Jose Armando Garcia Sancio created KAFKA-13918: -- Summary: Schedule or cancel nooprecord write on metadata version change Key: KAFKA-13918 URL: https://issues.apache.org/jira/browse/KAFKA-13918 Project: Kafka Issue Type: Sub-task Components: controller Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-1930) Move server over to new metrics library
[ https://issues.apache.org/jira/browse/KAFKA-1930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-1930: - Assignee: (was: Aditya Auradkar) > Move server over to new metrics library > --- > > Key: KAFKA-1930 > URL: https://issues.apache.org/jira/browse/KAFKA-1930 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Kreps >Priority: Major > > We are using org.apache.kafka.common.metrics on the clients, but using Coda > Hale metrics on the server. We should move the server over to the new metrics > package as well. This will help to make all our metrics self-documenting. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vvcephei commented on a diff in pull request #12186: MINOR: Deflake OptimizedKTableIntegrationTest
vvcephei commented on code in PR #12186: URL: https://github.com/apache/kafka/pull/12186#discussion_r877582344 ## streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java: ## @@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); -final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); -final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); - -final boolean kafkaStreams1WasFirstActive; -final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - -// Assert that the current value in store reflects all messages being processed -if ((keyQueryMetadata.activeHost().port() % 2) == 1) { -assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1))); -kafkaStreams1WasFirstActive = true; -} else { -assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1))); -kafkaStreams1WasFirstActive = false; -} - -if (kafkaStreams1WasFirstActive) { -kafkaStreams1.close(); -} else { -kafkaStreams2.close(); -} +final AtomicReference> newActiveStore = new AtomicReference<>(null); +TestUtils.retryOnExceptionWithTimeout(() -> { +final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); +final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); + +final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + +try { +// Assert that the current value in store reflects all messages being processed +if ((keyQueryMetadata.activeHost().port() % 2) == 1) { +assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1))); +kafkaStreams1.close(); +newActiveStore.set(store2); +} else { +assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1))); +kafkaStreams2.close(); +newActiveStore.set(store1); +} +} catch (final InvalidStateStoreException e) { +LOG.warn("Detected an unexpected rebalance during test. Retrying if possible.", e); +throw e; Review Comment: This triggers the retryOnException logic ## streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java: ## @@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); -final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); -final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); - -final boolean kafkaStreams1WasFirstActive; -final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - -// Assert that the current value in store reflects all messages being processed -if ((keyQueryMetadata.activeHost().port() % 2) == 1) { -assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1))); -kafkaStreams1WasFirstActive = true; -} else { -assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1))); -kafkaStreams1WasFirstActive = false; -} - -if (kafkaStreams1WasFirstActive) { -kafkaStreams1.close(); -} else { -kafkaStreams2.close(); -} +final AtomicReference> newActiveStore = new AtomicReference<>(null); +TestUtils.retryOnExceptionWithTimeout(() -> { +final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); +final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); Review Comment: Depending on where and when the rebalance
[GitHub] [kafka] vvcephei opened a new pull request, #12186: MINOR: Deflake OptimizedKTableIntegrationTest
vvcephei opened a new pull request, #12186: URL: https://github.com/apache/kafka/pull/12186 This test has been flaky due to unexpected rebalances during the test. This change fixes it by detecting an unexpected rebalance and retrying the test logic (within a timeout). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
rittikaadhikari commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r877577941 ## core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala: ## @@ -40,9 +42,15 @@ import scala.collection.Map * setOffsetsForNextResponse */ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], - sourceBroker: BrokerEndPoint, + override private[server] val sourceBroker: BrokerEndPoint, time: Time) - extends BlockingSend { + extends BrokerBlockingSender(sourceBroker = sourceBroker, Review Comment: Would there ever be a scenario in which we wouldn't want `brokerEndPoint` to be a part of the `BlockingSend` trait? I.e., is there ever an implementation of `BlockingSend` in which a `brokerEndPoint` would be extraneous? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12005: KAFKA-13803: Refactor Leader API Access
junrao commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r877559859 ## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.cluster.BrokerEndPoint +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param brokerEndPoint The broker (host:port) that we want to connect to Review Comment: brokerEndPoint => sourceBroker ## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.BrokerEndPoint + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix + * @param endpoint The raw leader endpoint used to communicate with the leader + * @param fetchSessionHandler A FetchSessionHandler to track the partitions in the session + * @param brokerConfig Broker configuration + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class RemoteLeaderEndPoint(logPrefix: String, + endpoint: BrokerBlockingSender, Review Comment: Should endpoint be named blockingSender? ## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ## @@ -0,0 +1,109 @@ +/* + *
[jira] [Commented] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539801#comment-17539801 ] François Rosière commented on KAFKA-13913: -- [~mjsax] , thanks for your comment. The two referenced Jira issues were about adding builders for configs (didn't find anything clear about the push back) while here, the idea would be to have builders on the producers, consumers and kafka streams. Previous proposal made in KIP-832 was about adding more constructors to directly expose the config object but builders have been proposed and looks more user friendly. These builders are only exposing meaningful methods for the users taking care of type safety, auto completion, overrides, etc. It's also a straightforward way to support injection of already configured dependencies such as the interceptors, serializers/deserializers. Simple configurations and complex configurations could still be provided using the current approach. So, don't see any reasons to not progress in that direction. > Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams > - > > Key: KAFKA-13913 > URL: https://issues.apache.org/jira/browse/KAFKA-13913 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 3.2.0 >Reporter: François Rosière >Assignee: François Rosière >Priority: Major > Labels: kip > Fix For: 3.3.0 > > > To have more flexibility, builders should be provided for the following > objects > * KafkaProducer > * KafkaConsumer > * KafkaStreams > These builders will give an easy way to construct these objects using > different arguments/combinations without having to add a new constructor > every time a new parameter is required. > They will also allow using already configured dependencies coming from an > injection framework such as Spring (see > [https://github.com/spring-projects/spring-kafka/issues/2244]). > From a user point of view, builders would be used as follow > {noformat} > KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo() > .withKeySerializer() > .withValueSerializer() > .withInterceptors() > .withPartitioner() > .withMetricsReporter() > .build(); > KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo() > .withKeyDeserializer() > .withValueDeserializer() > .withInterceptors() > .withMetricsReporter() > .build(); > KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, > ) > .withProducerInterceptors() > .withConsumerInterceptors() > .withTime() .withKafkaClientSupplier() > .withMetricsReporter() > .build();{noformat} > This KIP can be seen as the continuity of the KIP-832. > More details can be found in the related KIP > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji opened a new pull request, #12185: MINOR: Fix buildResponseSend test cases for envelope responses
hachikuji opened a new pull request, #12185: URL: https://github.com/apache/kafka/pull/12185 The test cases we have in `RequestChannelTest` for `buildResponseSend` construct the envelope request incorrectly. Basically they confuse the envelope context and the reference to the wrapped envelope request object. This patch fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request is built properly. It also consolidates the tests in `RequestChannelTest` to avoid duplication. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704
cmccabe commented on PR #12182: URL: https://github.com/apache/kafka/pull/12182#issuecomment-1132189652 Thanks for the PR, @mumrah . 1. I agree that tagged fields definitely count as new metadata. `MetadataVersion.IBP_3_2_IV0` is currently marked with `didMetadataChange = false`. If this metadata version did add new metadata (sounds like it did?), this needs to be marked as `true`. Also, we should probably add a very short description of the new fields added in each version, in the comment for the appropriate MetadataVersion. (i.e. "added X field to Y record"). It's good to set this pattern early so others can follow... 2. I would prefer to embed the `MetadataVersion` in each `*Image` (`AclsImage`, `ClusterImage`, etc.) rather than passing it into `write` and other functions. I think it's fundamental to how we interpret the data -- there may be fields that we have to treat slightly differently based on the version (although we want to minimize this as much as we possibly can, of course). This also lets us have invariants in the constructors like AclsImage objects at MV X never have field Y, and so on, which may be nice for unit tests. 3. I think rather than having `MetadataVersionSupplier`, we should just pass `FeatureControlManager` to the constructors of the other Control Managers. This will be useful in the future when we have other feature flags that we want to check. In general I don't see a big benefit to extracting an interface here -- `FeatureControlManager` is very lightweight to create and if you want one for testing, you can just create one. We do this for a lot of other things -- it works very well when you have an object A and B and A makes a lot of calls into B, but B makes none (or very few) into A. This also makes it easier to understand the control flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13595) Allow producing records with null values in Kafka Console Producer
[ https://issues.apache.org/jira/browse/KAFKA-13595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539764#comment-17539764 ] Ryan commented on KAFKA-13595: -- Duplicates https://issues.apache.org/jira/browse/KAFKA-10238 > Allow producing records with null values in Kafka Console Producer > -- > > Key: KAFKA-13595 > URL: https://issues.apache.org/jira/browse/KAFKA-13595 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.2.0 > > > KIP-810: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-810%3A+Allow+producing+records+with+null+values+in+Kafka+Console+Producer -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vamossagar12 commented on pull request #12121: KAFKA-13846: Adding overloaded metricOrElseCreate method
vamossagar12 commented on PR #12121: URL: https://github.com/apache/kafka/pull/12121#issuecomment-1132048815 > > From a public API point of view, Metrics is in a gray area. It is not officially part of our public API however we have a few interfaces leaking it. That being said, we should be careful with the changes that we do here. > > I agree with @dajac as well, what I was thinking is that we can modify the private `registerMetric`, and I'm neutral about whether adding a public `metricOrElseCreate` should require a KIP. If people think this is necessary, we could create one. Sure thing... Even I am neutral about KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12121: KAFKA-13846: Adding overloaded metricOrElseCreate method
vamossagar12 commented on code in PR #12121: URL: https://github.com/apache/kafka/pull/12121#discussion_r877394625 ## clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java: ## @@ -563,10 +615,15 @@ public synchronized void removeReporter(MetricsReporter reporter) { } } -synchronized void registerMetric(KafkaMetric metric) { +synchronized void registerMetric(KafkaMetric metric, boolean raiseIfMetricExists) { Review Comment: @guozhangwang i think yeah that could be done. I have made the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539718#comment-17539718 ] Matthias J. Sax commented on KAFKA-13913: - There was some discussion about this in the past (cf KAFKA-3943 and KAFKA-4436). There was also some push back on this idea. Might be good to revisit those arguments. (Not sure if it's contained in the Jira, or PR, or maybe mailing list.) > Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams > - > > Key: KAFKA-13913 > URL: https://issues.apache.org/jira/browse/KAFKA-13913 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 3.2.0 >Reporter: François Rosière >Assignee: François Rosière >Priority: Major > Labels: kip > Fix For: 3.3.0 > > > To have more flexibility, builders should be provided for the following > objects > * KafkaProducer > * KafkaConsumer > * KafkaStreams > These builders will give an easy way to construct these objects using > different arguments/combinations without having to add a new constructor > every time a new parameter is required. > They will also allow using already configured dependencies coming from an > injection framework such as Spring (see > [https://github.com/spring-projects/spring-kafka/issues/2244]). > From a user point of view, builders would be used as follow > {noformat} > KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo() > .withKeySerializer() > .withValueSerializer() > .withInterceptors() > .withPartitioner() > .withMetricsReporter() > .build(); > KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo() > .withKeyDeserializer() > .withValueDeserializer() > .withInterceptors() > .withMetricsReporter() > .build(); > KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, > ) > .withProducerInterceptors() > .withConsumerInterceptors() > .withTime() .withKafkaClientSupplier() > .withMetricsReporter() > .build();{noformat} > This KIP can be seen as the continuity of the KIP-832. > More details can be found in the related KIP > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13913: Labels: kip (was: ) > Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams > - > > Key: KAFKA-13913 > URL: https://issues.apache.org/jira/browse/KAFKA-13913 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 3.2.0 >Reporter: François Rosière >Assignee: François Rosière >Priority: Major > Labels: kip > Fix For: 3.3.0 > > > To have more flexibility, builders should be provided for the following > objects > * KafkaProducer > * KafkaConsumer > * KafkaStreams > These builders will give an easy way to construct these objects using > different arguments/combinations without having to add a new constructor > every time a new parameter is required. > They will also allow using already configured dependencies coming from an > injection framework such as Spring (see > [https://github.com/spring-projects/spring-kafka/issues/2244]). > From a user point of view, builders would be used as follow > {noformat} > KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo() > .withKeySerializer() > .withValueSerializer() > .withInterceptors() > .withPartitioner() > .withMetricsReporter() > .build(); > KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo() > .withKeyDeserializer() > .withValueDeserializer() > .withInterceptors() > .withMetricsReporter() > .build(); > KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, > ) > .withProducerInterceptors() > .withConsumerInterceptors() > .withTime() .withKafkaClientSupplier() > .withMetricsReporter() > .build();{noformat} > This KIP can be seen as the continuity of the KIP-832. > More details can be found in the related KIP > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13913: Component/s: streams > Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams > - > > Key: KAFKA-13913 > URL: https://issues.apache.org/jira/browse/KAFKA-13913 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 3.2.0 >Reporter: François Rosière >Assignee: François Rosière >Priority: Major > Fix For: 3.3.0 > > > To have more flexibility, builders should be provided for the following > objects > * KafkaProducer > * KafkaConsumer > * KafkaStreams > These builders will give an easy way to construct these objects using > different arguments/combinations without having to add a new constructor > every time a new parameter is required. > They will also allow using already configured dependencies coming from an > injection framework such as Spring (see > [https://github.com/spring-projects/spring-kafka/issues/2244]). > From a user point of view, builders would be used as follow > {noformat} > KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo() > .withKeySerializer() > .withValueSerializer() > .withInterceptors() > .withPartitioner() > .withMetricsReporter() > .build(); > KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo() > .withKeyDeserializer() > .withValueDeserializer() > .withInterceptors() > .withMetricsReporter() > .build(); > KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, > ) > .withProducerInterceptors() > .withConsumerInterceptors() > .withTime() .withKafkaClientSupplier() > .withMetricsReporter() > .build();{noformat} > This KIP can be seen as the continuity of the KIP-832. > More details can be found in the related KIP > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] divijvaidya opened a new pull request, #12184: KAFKA-13911: Fix the rate window size calculation for edge cases
divijvaidya opened a new pull request, #12184: URL: https://github.com/apache/kafka/pull/12184 ## Problem Implementation of connection creation rate quotas in Kafka is dependent on two configurations: [quota.window.num](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.num) AND [quota.window.size.seconds](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.size.seconds) The minimum possible values of these configuration is 1 as per the documentation. However, when we set 1 as the configuration value, we can hit a situation where rate is calculated as NaN (and hence, leads to exceptions). This specific scenario occurs when an event is recorded at the start of a sample window. ## Solution This patch fixes this edge case by ensuring that the windowSize over which Rate is calculated is at least 1ms (even if it is calculated at the start of the sample window). ## Test Added a unit test which fails before the patch and passes after the patch ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12183: KAFKA-13883: Implement NoOpRecord and metadata metrics
jsancio opened a new pull request, #12183: URL: https://github.com/apache/kafka/pull/12183 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13863) Prevent null config value when create topic in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13863. - Fix Version/s: 3.3.0 Resolution: Fixed > Prevent null config value when create topic in KRaft mode > - > > Key: KAFKA-13863 > URL: https://issues.apache.org/jira/browse/KAFKA-13863 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Major > Fix For: 3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12109: KAFKA-13863: Prevent null config value when create topic in KRaft mode
hachikuji merged PR #12109: URL: https://github.com/apache/kafka/pull/12109 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #12159: [WIP] Fix stuck SSL tests in case of authentication failure
fvaleri commented on PR #12159: URL: https://github.com/apache/kafka/pull/12159#issuecomment-1131936314 @divijvaidya I fixed the test you were referring to. I also fixed the help method `sendNoReceive` which was directly using `channel.mute()` instead of `selector.mute(channel.id())`. The first call does not ensure proper state handling, and I believe it's the reason why `testCloseOldestConnectionWithMultiplePendingReceives` was randomly failing. Let's see if the test job pass this time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704
mumrah commented on PR #12182: URL: https://github.com/apache/kafka/pull/12182#issuecomment-1131934558 FeaturesImage has `metadataVersion()` method which we can make accessible through a supplier for the Image/Delta classes. For broker components like ReplicaManager, we can add an argument to applyDelta that indicates a MetadataVersion change. I had some code like this in the [original PR](https://github.com/apache/kafka/pull/11677/commits/83bbf3b21bb0d9f313aa251cf011ec35748876e2#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0R2110), but we removed it since we didn't have any use cases yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704
mumrah commented on code in PR #12182: URL: https://github.com/apache/kafka/pull/12182#discussion_r877220561 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1010,6 +1004,11 @@ private void maybeCompleteAuthorizerInitialLoad() { } } +// Visible for testing +MetadataVersion currentMetadataVersion() { +return featureControl.metadataVersion(); +} Review Comment: Oops, that comment was copied over from another PR #12177 :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12182: MINOR: Use dynamic metadata.version check for KIP-704
jsancio commented on code in PR #12182: URL: https://github.com/apache/kafka/pull/12182#discussion_r877200466 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1010,6 +1004,11 @@ private void maybeCompleteAuthorizerInitialLoad() { } } +// Visible for testing +MetadataVersion currentMetadataVersion() { +return featureControl.metadataVersion(); +} Review Comment: Which test needs this? It doesn't look like this is used in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #12182: MINOR: Use dynamic metadata.version check for KIP-704
mumrah opened a new pull request, #12182: URL: https://github.com/apache/kafka/pull/12182 Now that `metadata.version` has been integrated with the controller #12050, we need to make use of the dynamic nature of the feature flag. This patch adds a supplier that is passed down to ReplicationControlManager so that the leader recovery feature can be toggled on/off. IBP/MetadataVersion 3.2 added the `LeaderRecoveryState` tagged field to PartitionRecord. If we are downgrading from 3.2 to 3.0, the downgrade of the feature flag will cause the active controller to stop populating this field, but the value will still be present in existing records. However, since this is a tagged field, it won't cause any compatibility problems when we actually downgrade the server binaries (the tagged field will just be ignored). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13807) Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs
[ https://issues.apache.org/jira/browse/KAFKA-13807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13807. -- Resolution: Fixed > Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs > - > > Key: KAFKA-13807 > URL: https://issues.apache.org/jira/browse/KAFKA-13807 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13807) Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs
[ https://issues.apache.org/jira/browse/KAFKA-13807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-13807: Assignee: Colin McCabe > Ensure that we can set log.flush.interval.ms with IncrementalAlterConfigs > - > > Key: KAFKA-13807 > URL: https://issues.apache.org/jira/browse/KAFKA-13807 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] Moovlin commented on pull request #12167: KAFKA-13716 Added the DeleteRecordsCommandTest to test the CLI front end of the D…
Moovlin commented on PR #12167: URL: https://github.com/apache/kafka/pull/12167#issuecomment-1131755255 Thanks for the quick responses. To your first answer, I'm happy to do that and will take a look at the TopciCommandTest for guidance. To your second answer. Integration tests for this should probably be in a different Jira since the methods in the MockAdminClient are effectively not implemented (if you try to do anything other than delete from the first offset, it throws an UnsupportedOperationException). So I'd like to keep the scope of this PR to unit tests. If you think that makes sense, I'll go ahead and open up an issue for it to make sure we track that outstanding item. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)
dajac opened a new pull request, #12181: URL: https://github.com/apache/kafka/pull/12181 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi opened a new pull request, #12180: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop
viktorsomogyi opened a new pull request, #12180: URL: https://github.com/apache/kafka/pull/12180 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop
[ https://issues.apache.org/jira/browse/KAFKA-13917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-13917: Description: Currently the heartbeat thread's lookupCoordinator() is called in a tight loop if brokers crash and the consumer is left running. Besides that it floods the logs on debug level, it increases CPU usage as well. The fix is easy, just need to put a backoff call after coordinator lookup. Reproduction: # Start a few brokers # Create a topic and produce to it # Start consuming # Stop all brokers At this point lookupCoordinator() will be called in a tight loop. was: Currently the heartbeat thread's lookupCoordinator() is called in a tight loop if brokers crash and the consumer is left running. Besides that it floods the logs on debug level, it increases CPU usage as well. The fix is easy, just need to put a backoff call after coordinator lookup. > Avoid calling lookupCoordinator() in tight loop > --- > > Key: KAFKA-13917 > URL: https://issues.apache.org/jira/browse/KAFKA-13917 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 3.1.0, 3.1.1, 3.1.2 >Reporter: Viktor Somogyi-Vass >Assignee: Viktor Somogyi-Vass >Priority: Major > > Currently the heartbeat thread's lookupCoordinator() is called in a tight > loop if brokers crash and the consumer is left running. Besides that it > floods the logs on debug level, it increases CPU usage as well. > The fix is easy, just need to put a backoff call after coordinator lookup. > Reproduction: > # Start a few brokers > # Create a topic and produce to it > # Start consuming > # Stop all brokers > At this point lookupCoordinator() will be called in a tight loop. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop
Viktor Somogyi-Vass created KAFKA-13917: --- Summary: Avoid calling lookupCoordinator() in tight loop Key: KAFKA-13917 URL: https://issues.apache.org/jira/browse/KAFKA-13917 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 3.1.1, 3.1.0, 3.1.2 Reporter: Viktor Somogyi-Vass Assignee: Viktor Somogyi-Vass Currently the heartbeat thread's lookupCoordinator() is called in a tight loop if brokers crash and the consumer is left running. Besides that it floods the logs on debug level, it increases CPU usage as well. The fix is easy, just need to put a backoff call after coordinator lookup. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13916) Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)
[ https://issues.apache.org/jira/browse/KAFKA-13916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13916: Description: KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft > Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841) > > > Key: KAFKA-13916 > URL: https://issues.apache.org/jira/browse/KAFKA-13916 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13916) Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)
David Jacot created KAFKA-13916: --- Summary: Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841) Key: KAFKA-13916 URL: https://issues.apache.org/jira/browse/KAFKA-13916 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dajac commented on a diff in pull request #12065: KAFKA-13788: Use AdminClient.incrementalAlterConfigs in ConfigCommand
dajac commented on code in PR #12065: URL: https://github.com/apache/kafka/pull/12065#discussion_r876706874 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -367,15 +366,12 @@ object ConfigCommand extends Logging { if (invalidConfigs.nonEmpty) throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") -val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted -val sensitiveEntries = newEntries.filter(_._2.value == null) -if (sensitiveEntries.nonEmpty) - throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") -val newConfig = new JConfig(newEntries.asJava.values) - val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) -adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) +val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) Review Comment: All this code looks pretty similar to the other branches now. Do you think that we could refactor and share more? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
[ https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539465#comment-17539465 ] lqjacklee commented on KAFKA-13888: --- [~Niket Goel] for the field ‘LastCaughtUpTimestamp’ , how can i compute it. the field ‘LastFetchTimestamp’ I can fetch from the leaderState.getReplicatedLastFetchTimestamp(leaderState.localId()); > KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag > -- > > Key: KAFKA-13888 > URL: https://issues.apache.org/jira/browse/KAFKA-13888 > Project: Kafka > Issue Type: Improvement > Components: kraft >Reporter: Niket Goel >Assignee: Niket Goel >Priority: Major > > Tracking issue for the implementation of KIP:836 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mimaison commented on pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
mimaison commented on PR #11780: URL: https://github.com/apache/kafka/pull/11780#issuecomment-1131508676 Thanks for the updates @C0urante ! I've on PTO next week, I'll take a look at the tests when I'm back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
mimaison commented on code in PR #11780: URL: https://github.com/apache/kafka/pull/11780#discussion_r876878064 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1000,6 +1090,266 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } +abstract class TaskBuilder { + +private final ConnectorTaskId id; +private final ClusterConfigState configState; +private final TaskStatus.Listener statusListener; +private final TargetState initialState; + +private Task task = null; +private ConnectorConfig connectorConfig = null; +private Converter keyConverter = null; +private Converter valueConverter = null; +private HeaderConverter headerConverter = null; +private ClassLoader classLoader = null; + +public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { +this.id = id; +this.configState = configState; +this.statusListener = statusListener; +this.initialState = initialState; +} + +public TaskBuilder withTask(Task task) { +this.task = task; +return this; +} + +public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { +this.connectorConfig = connectorConfig; +return this; +} + +public TaskBuilder withKeyConverter(Converter keyConverter) { +this.keyConverter = keyConverter; +return this; +} + +public TaskBuilder withValueConverter(Converter valueConverter) { +this.valueConverter = valueConverter; +return this; +} + +public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { +this.headerConverter = headerConverter; +return this; +} + +public TaskBuilder withClassloader(ClassLoader classLoader) { +this.classLoader = classLoader; +return this; +} + +public WorkerTask build() { +Objects.requireNonNull(task, "Task cannot be null"); +Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); +Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); +Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); +Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); +Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + +ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); +final Class connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); +RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), +connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); +retryWithToleranceOperator.metrics(errorHandlingMetrics); + +return doBuild(task, id, configState, statusListener, initialState, +connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, +errorHandlingMetrics, connectorClass, retryWithToleranceOperator); +} + +abstract WorkerTask doBuild(Task task, +ConnectorTaskId id, +ClusterConfigState configState, +TaskStatus.Listener statusListener, +TargetState initialState, +ConnectorConfig connectorConfig, +Converter keyConverter, +Converter valueConverter, +HeaderConverter headerConverter, +ClassLoader classLoader, +ErrorHandlingMetrics errorHandlingMetrics, +Class connectorClass, +RetryWithToleranceOperator retryWithToleranceOperator); + +} + +class SinkTaskBuilder extends TaskBuilder { +public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { +super(id, configState, statusListener, initialState); +} + +@Override +public WorkerTask doBuild(Task task, +
[GitHub] [kafka] mimaison commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
mimaison commented on code in PR #11780: URL: https://github.com/apache/kafka/pull/11780#discussion_r876877095 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -576,88 +672,42 @@ public boolean startTask( executor.submit(workerTask); if (workerTask instanceof WorkerSourceTask) { -sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask); +sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask)); } return true; } } -private WorkerTask buildWorkerTask(ClusterConfigState configState, - ConnectorConfig connConfig, - ConnectorTaskId id, - Task task, - TaskStatus.Listener statusListener, - TargetState initialState, - Converter keyConverter, - Converter valueConverter, - HeaderConverter headerConverter, - ClassLoader loader) { -ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); -final Class connectorClass = plugins.connectorClass( -connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); -RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(), -connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM); -retryWithToleranceOperator.metrics(errorHandlingMetrics); - -// Decide which type of worker task we need based on the type of task. -if (task instanceof SourceTask) { -SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, -connConfig.originalsStrings(), config.topicCreationEnable()); -retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); -TransformationChain transformationChain = new TransformationChain<>(sourceConfig.transformations(), retryWithToleranceOperator); -log.info("Initializing: {}", transformationChain); -CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), -internalKeyConverter, internalValueConverter); -OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), -internalKeyConverter, internalValueConverter); -Map producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass, - connectorClientConfigOverridePolicy, kafkaClusterId); -KafkaProducer producer = new KafkaProducer<>(producerProps); -TopicAdmin admin; -Map topicCreationGroups; -if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { -Map adminProps = adminConfigs(id, "connector-adminclient-" + id, config, -sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); -admin = new TopicAdmin(adminProps); -topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); -} else { -admin = null; -topicCreationGroups = null; -} - -// Note we pass the configState as it performs dynamic transformations under the covers -return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, -headerConverter, transformationChain, producer, admin, topicCreationGroups, -offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor); -} else if (task instanceof SinkTask) { -TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations(), retryWithToleranceOperator); -log.info("Initializing: {}", transformationChain); -SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings()); -retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); -WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, -keyConverter, valueConverter, headerConverter); - -Map consumerProps = consumerConfigs(id, config, connConfig,
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539447#comment-17539447 ] ASF GitHub Bot commented on KAFKA-13882: mimaison commented on code in PR #410: URL: https://github.com/apache/kafka-site/pull/410#discussion_r876875276 ## .htaccess: ## @@ -9,3 +9,5 @@ RewriteRule ^/?(\d+)/javadoc - [S=2] RewriteRule ^/?(\d+)/images/ - [S=1] RewriteCond $2 !=protocol RewriteRule ^/?(\d+)/([a-z]+)(\.html)? /$1/documentation#$2 [R=302,L,NE] +RewriteCond %{REQUEST_FILENAME}.html -f +RewriteRule ^(.*)$ %{REQUEST_FILENAME}.html Review Comment: I find it strange we have to edit this file and I wonder how it works on the website. I'll try to find out > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact
[ https://issues.apache.org/jira/browse/KAFKA-13915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter James Pringle updated KAFKA-13915: Description: Add sanity validation on streams start up that *repartition* topics are not setup with *cleanup.policy* of {*}compact{*}. In enterprise envs automated creation of kafka streams intermediate topics is not always possible due to policy restrictions and as a result it is done manually which is prone to user misconfiguration. In several cases we have found the repartition topics have been incorrectly setup following the changelog topic setup with compact enabled. The result being that a compacted repartition topic will result in data loss if more that one value is mapped to the new key. This has been noticed where aggregate follows a repartition topic and the aggregated value is incorrect. Example: {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} Repartition by type i.e. drink: Expected: {{(drink, coffee), (drink, tea), (drink, beer)}} With compaction the following is possible: Actual {{(drink, beer);}} coffee and tea are lost. was: Add sanity validation on streams start up that *repartition* topics are not setup with *cleanup.policy* of {*}compact{*}. In enterprise envs automated creation of kafka streams intermediate topics is not always possible due to policy restrictions and as a result it is done manually which is prone to user misconfiguration. In several cases we have found the repartition topics have been incorrectly setup following the changelog topic setup with compact enabled. The result being that a compacted repartition topic will result in data loss if more that one value is mapped to the new key. This has been noticed where aggregate follows a repartition topic and the aggregated value is incorrect. Example: {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} Repartition by type i.e. drink: Expected: {{(drink, coffee), (drink, tea), (drink, beer)}} With compaction the following is possible: Actual {\{(drink, beer); }} coffee and tea are lost. > Kafka streams should validate that the repartition topics are not created > with cleanup.policy compact > - > > Key: KAFKA-13915 > URL: https://issues.apache.org/jira/browse/KAFKA-13915 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.1 >Reporter: Peter James Pringle >Priority: Major > > Add sanity validation on streams start up that *repartition* topics are not > setup with *cleanup.policy* of {*}compact{*}. > In enterprise envs automated creation of kafka streams intermediate topics is > not always possible due to policy restrictions and as a result it is done > manually which is prone to user misconfiguration. > In several cases we have found the repartition topics have been incorrectly > setup following the changelog topic setup with compact enabled. The result > being that a compacted repartition topic will result in data loss if more > that one value is mapped to the new key. This has been noticed where > aggregate follows a repartition topic and the aggregated value is incorrect. > > Example: > > {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} > > Repartition by type i.e. drink: > > Expected: > {{(drink, coffee), (drink, tea), (drink, beer)}} > > With compaction the following is possible: > > Actual > {{(drink, beer);}} > coffee and tea are lost. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact
[ https://issues.apache.org/jira/browse/KAFKA-13915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter James Pringle updated KAFKA-13915: Description: Add sanity validation on streams start up that *repartition* topics are not setup with *cleanup.policy* of {*}compact{*}. In enterprise envs automated creation of kafka streams intermediate topics is not always possible due to policy restrictions and as a result it is done manually which is prone to user misconfiguration. In several cases we have found the repartition topics have been incorrectly setup following the changelog topic setup with compact enabled. The result being that a compacted repartition topic will result in data loss if more that one value is mapped to the new key. This has been noticed where aggregate follows a repartition topic and the aggregated value is incorrect. Example: {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} Repartition by type i.e. drink: Expected: {{(drink, coffee), (drink, tea), (drink, beer)}} With compaction the following is possible: Actual {\{(drink, beer); }} coffee and tea are lost. was: Add sanity validation on streams start up that *repartition* topics are not setup with *cleanup.policy* of {*}compact{*}. In enterprise envs automated creation of kafka streams intermediate topics is not always possible due to policy restrictions and as a result it is done manually which is prone to user misconfiguration. In several cases we have found the repartition topics have been incorrectly setup following the changelog topic setup with compact enabled. The result being that a compacted repartition topic will result in data loss if more that one value is mapped to the new key. Example: {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} Repartition by type i.e. drink: Expected: {{(drink, coffee), (drink, tea), (drink, beer)}} With compaction the following is possible: Actual {{(drink, beer); }} coffee and tea are lost. > Kafka streams should validate that the repartition topics are not created > with cleanup.policy compact > - > > Key: KAFKA-13915 > URL: https://issues.apache.org/jira/browse/KAFKA-13915 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.1 >Reporter: Peter James Pringle >Priority: Major > > Add sanity validation on streams start up that *repartition* topics are not > setup with *cleanup.policy* of {*}compact{*}. > In enterprise envs automated creation of kafka streams intermediate topics is > not always possible due to policy restrictions and as a result it is done > manually which is prone to user misconfiguration. > In several cases we have found the repartition topics have been incorrectly > setup following the changelog topic setup with compact enabled. The result > being that a compacted repartition topic will result in data loss if more > that one value is mapped to the new key. This has been noticed where > aggregate follows a repartition topic and the aggregated value is incorrect. > > Example: > > {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} > > Repartition by type i.e. drink: > > Expected: > {{(drink, coffee), (drink, tea), (drink, beer)}} > > With compaction the following is possible: > > Actual > {\{(drink, beer); }} > coffee and tea are lost. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13915) Kafka streams should validate that the repartition topics are not created with cleanup.policy compact
Peter James Pringle created KAFKA-13915: --- Summary: Kafka streams should validate that the repartition topics are not created with cleanup.policy compact Key: KAFKA-13915 URL: https://issues.apache.org/jira/browse/KAFKA-13915 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.8.1 Reporter: Peter James Pringle Add sanity validation on streams start up that *repartition* topics are not setup with *cleanup.policy* of {*}compact{*}. In enterprise envs automated creation of kafka streams intermediate topics is not always possible due to policy restrictions and as a result it is done manually which is prone to user misconfiguration. In several cases we have found the repartition topics have been incorrectly setup following the changelog topic setup with compact enabled. The result being that a compacted repartition topic will result in data loss if more that one value is mapped to the new key. Example: {{Original data: (coffee, drink), (tea, drink), (beer, drink)}} Repartition by type i.e. drink: Expected: {{(drink, coffee), (drink, tea), (drink, beer)}} With compaction the following is possible: Actual {{(drink, beer); }} coffee and tea are lost. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
showuon commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r876740554 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -830,7 +830,13 @@ class KafkaServer( private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { val checkpoint = brokerMetadataCheckpoints(logDir) - checkpoint.write(brokerMetadata.toProperties) + try { +checkpoint.write(brokerMetadata.toProperties) + } catch { + case e: IOException => +val dirPath = checkpoint.file.getAbsolutePath +logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing meta.properties to $dirPath", e) Review Comment: handle IOException during writing meta.properties when broker startup, to avoid it gracefully shutdown the broker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
showuon commented on PR #12136: URL: https://github.com/apache/kafka/pull/12136#issuecomment-1131379581 @junrao , thanks for your review. I've addressed your comments. Also, I found we should handle `IOException` during writing `meta.properties` in server.startup. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
showuon commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r876740835 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File], s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => -offlineDirs.add((logDirAbsolutePath, e)) -error(s"Error while loading log dir $logDirAbsolutePath", e) +handleIOException(logDirAbsolutePath, e) + case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] => +// KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache +handleIOException(logDirAbsolutePath, e.getCause.asInstanceOf[IOException]) Review Comment: Good catch! Updated. ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -158,22 +197,54 @@ class LogLoaderTest { } locally { - simulateError.hasError = true - val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError) - log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, logDirFailureChannel) - // Simulate error - assertThrows(classOf[RuntimeException], () => { -val defaultConfig = logManager.currentDefaultConfig -logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) - }) + // Simulate Runtime error + assertThrows(classOf[RuntimeException], runLoadLogs) assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed") + assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown") + // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time. simulateError.hasError = false cleanShutdownInterceptedValue = true val defaultConfig = logManager.currentDefaultConfig logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty)) assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag") + logManager.shutdown() +} + +locally { + val logDirFailureChannel = new LogDirFailureChannel(logDirs.size) + val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, logDirFailureChannel) + + // Simulate IO error + assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled") + + assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown") + logManager.shutdown() +} + +locally { Review Comment: Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly
showuon commented on code in PR #12136: URL: https://github.com/apache/kafka/pull/12136#discussion_r876740554 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -830,7 +830,13 @@ class KafkaServer( private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = { for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) { val checkpoint = brokerMetadataCheckpoints(logDir) - checkpoint.write(brokerMetadata.toProperties) + try { +checkpoint.write(brokerMetadata.toProperties) + } catch { + case e: IOException => +val dirPath = checkpoint.file.getAbsolutePath +logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing meta.properties to $dirPath", e) Review Comment: handle IOException during writing meta.properties, to avoid it gracefully shutdown the broker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12065: KAFKA-13788: Use AdminClient.incrementalAlterConfigs in ConfigCommand
dajac commented on code in PR #12065: URL: https://github.com/apache/kafka/pull/12065#discussion_r876706874 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -367,15 +366,12 @@ object ConfigCommand extends Logging { if (invalidConfigs.nonEmpty) throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") -val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted -val sensitiveEntries = newEntries.filter(_._2.value == null) -if (sensitiveEntries.nonEmpty) - throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") -val newConfig = new JConfig(newEntries.asJava.values) - val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) -adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) +val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) Review Comment: All this code looks pretty similar to the others now. Do you think that we could refactor and share more? ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -367,15 +366,12 @@ object ConfigCommand extends Logging { if (invalidConfigs.nonEmpty) throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") -val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted -val sensitiveEntries = newEntries.filter(_._2.value == null) -if (sensitiveEntries.nonEmpty) - throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") -val newConfig = new JConfig(newEntries.asJava.values) - val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) -adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) +val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) + ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } + ).asJavaCollection Review Comment: nit: We usually don't use curly braces for one-liners. The closing parenthesis (before `asJavaCollection` does not seem to be indented correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acsaki opened a new pull request, #12179: [KAFKA-13848] Clients remain connected after SASL re-authentication f…
acsaki opened a new pull request, #12179: URL: https://github.com/apache/kafka/pull/12179 Clients remain connected and able to produce or consume despite an expired OAUTHBEARER token. The problem can be reproduced using the https://github.com/acsaki/kafka-sasl-reauth project by starting the embedded OAuth2 server and Kafka, then running the long running consumer in OAuthBearerTest and then killing the OAuth2 server thus making the client unable to re-authenticate. Root cause seems to be SaslServerAuthenticator#calcCompletionTimesAndReturnSessionLifetimeMs failing to set ReauthInfo#sessionExpirationTimeNanos when tokens have already expired (when session life time goes negative), in turn causing KafkaChannel#serverAuthenticationSessionExpired returning false and finally SocketServer not closing the channel. The issue is observed with OAUTHBEARER but seems to have a wider impact on SASL re-authentication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org