[GitHub] [kafka] showuon merged pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata
showuon merged PR #13362: URL: https://github.com/apache/kafka/pull/13362 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata
showuon commented on PR #13362: URL: https://github.com/apache/kafka/pull/13362#issuecomment-1477344434 Failed tests are unrelated ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin() Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1477340172 > Why we need this PR if [KAFKA-9087](https://issues.apache.org/jira/browse/KAFKA-9087) had fixed the bug ( you mentioned in the jira)? Is there another potential bug? Or the bug fixed by [KAFKA-9087](https://issues.apache.org/jira/browse/KAFKA-9087) is not root cause? KAFKA-9087 solves the root cause of stopping fetch due to an "Offset mismatch" error thrown during "processPartitionData", but I thought about it, and there may be other potential exceptions here (although I haven't found it yet, it may be potential), It will also lead to the final result: the fetch stops and the log is not cleaned up, and finally the disk usage grows infinitely. To be more precise, this pr is a defensive measure, as I understand it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…
chia7712 commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1477334683 Why we need this PR if KAFKA-9087 had fixed the bug ( you mentioned in the jira)? Is there another potential bug? Or the bug fixed by KAFKA-9087 is not root cause? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohits64 commented on pull request #13361: KAFKA-14401: Resume WorkThread if Connector/Tasks reading offsets get stuck when underneath WorkThread dies [WIP]
rohits64 commented on PR #13361: URL: https://github.com/apache/kafka/pull/13361#issuecomment-1477318855 Hi @mukkachaitanya, thanks for the inputs. I think it we can go with exponential backoff with some static number of retries. I have made the changes. Currently I have made the number of retry as 10. That should be sufficient number of retries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata
satishd commented on PR #13362: URL: https://github.com/apache/kafka/pull/13362#issuecomment-1477226052 >LGTM, the only comment is should we make the formatter as a separate file? @satishd , thoughts? @showuon I am fine with formatter inside the serde class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic
mjsax merged PR #13365: URL: https://github.com/apache/kafka/pull/13365 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic
mjsax commented on code in PR #13365: URL: https://github.com/apache/kafka/pull/13365#discussion_r1142849436 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java: ## @@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, final long streamTime) physicalStore.openDB(context.appConfigs(), context.stateDir()); } +@Override +public void cleanupExpiredSegments(final long streamTime) { +super.cleanupExpiredSegments(streamTime); Review Comment: Ah. I see. Guess it would get clear quickly in an IDE -- just hard to see on GitHub. Seems fine w/o a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
mjsax commented on PR #13340: URL: https://github.com/apache/kafka/pull/13340#issuecomment-1477214689 Build failed with checkstyle errors: ``` [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-13340/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:344:13: 'method call rparen' has incorrect indentation level 12, expected level should be 8. [Indentation] [2023-03-07T22:44:54.193Z] [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-13340/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:355:13: Variable 'numRecordsProduced' should be declared final. [FinalLocalVariable] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
mjsax commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848310 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp, } /** - * Test-only processor for inserting records into a versioned store while also tracking - * them separately in-memory, and performing checks to validate expected store contents. - * Forwards the number of failed checks downstream for consumption. + * @param topic topic to produce to + * @param dataTracker map of key -> timestamp -> value for tracking data which is produced to + *the topic. This method will add the produced data into this in-memory + *tracker in addition to producing to the topic, in order to keep the two + *in sync. + * @param timestamp timestamp to produce with + * @param keyValues key-value pairs to produce + * + * @return number of records produced + */ +@SuppressWarnings("varargs") +@SafeVarargs +private final int produceDataToTopic(final String topic, + final DataTracker dataTracker, + final long timestamp, + final KeyValue... keyValues) { +produceDataToTopic(topic, timestamp, keyValues); + +for (final KeyValue keyValue : keyValues) { +dataTracker.add(keyValue.key, timestamp, keyValue.value); +} + +return keyValues.length; +} + +/** + * Test-only processor for validating expected contents of a versioned store, and forwards + * the number of failed checks downstream for consumption. Callers specify whether the + * processor should also be responsible for inserting records into the store (while also + * tracking them separately in-memory for use in validation). */ private static class VersionedStoreContentCheckerProcessor implements Processor { private ProcessorContext context; private VersionedKeyValueStore store; +// whether or not the processor should write records to the store as they arrive. +// must be false for global stores. Review Comment: Know that I understand how the test actually works, it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores
mjsax commented on code in PR #13340: URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848103 ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); -assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); +assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); +} + +@Test +public void shouldCreateGlobalTable() throws Exception { +// produce data to global store topic and track in-memory for processor to verify +final DataTracker data = new DataTracker(); +produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5")); +produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // out-of-order data + +// build topology and start app +final StreamsBuilder streamsBuilder = new StreamsBuilder(); + +streamsBuilder +.globalTable( +globalTableTopic, +Consumed.with(Serdes.Integer(), Serdes.String()), +Materialized +.as(new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION)) +.withKeySerde(Serdes.Integer()) +.withValueSerde(Serdes.String()) +); +streamsBuilder +.stream(inputStream, Consumed.with(Serdes.Integer(), Serdes.String())) +.process(() -> new VersionedStoreContentCheckerProcessor(false, data)) +.to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); + +final Properties props = props(); +kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); +kafkaStreams.start(); + +// produce source data to trigger store verifications in processor +int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp + 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8")); + +// wait for output and verify +final List> receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( +TestUtils.consumerConfig( +CLUSTER.bootstrapServers(), +IntegerDeserializer.class, +IntegerDeserializer.class), +outputStream, +numRecordsProduced); + +for (final KeyValue receivedRecord : receivedRecords) { +// verify zero failed checks for each record +assertThat(receivedRecord.value, equalTo(0)); Review Comment: I was referring to this comment: https://github.com/apache/kafka/pull/13340#discussion_r1128550162 > and we will not be able to write to a global store from the processor If you specify a global-store, we pass in the "global processor" that is able to write into the store (well, has to do this, to maintain the global store), and thus, we can easily track what goes into the store "on the side" is an in-memory data structure similar to what we do for a regular processor that maintains the store. > This test already has a processor which inspects/validates the contents of the global store. Have I misunderstood? I think I did not understand how the test works -- not I see that you use a regular processor to read the global state store to verify the content. So I guess my comment is void (I did basically propose to add this via a "global processor"). ## streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java: ## @@ -302,7 +319,54 @@ public void shouldAllowCustomIQv2ForCustomStoreImplementations() { .withPartitions(Collections.singleton(0)); final StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); -assertThat("success", equalTo(result.getOnlyPartitionResult().getResult())); +assertThat(result.getOnlyPartitionResult().getResult(), equalTo("success")); +} + +@Test +public void shouldCreateGlobalTable() throws Exception { +// produce data to global store topic and track in-memory for processor to verify +final DataTracker data = new DataTracker(); +produceDataToTopic(globalTableTopic, data, baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null)); +produceDataToTopic(globalTableTopi
[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores
mjsax commented on code in PR #13292: URL: https://github.com/apache/kafka/pull/13292#discussion_r1142842326 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ## @@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() { private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory, final String name) { -if (factory.isWindowStore()) { +if (factory.isVersionedStore()) { +final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig()); +config.setMinCompactionLagMs(factory.historyRetention()); Review Comment: > It's not strictly necessary though. If not necessary, no need to do anything. Just wanted to probe if we need to do anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7224) KIP-328: Add spill-to-disk for Suppression
[ https://issues.apache.org/jira/browse/KAFKA-7224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702980#comment-17702980 ] Matthias J. Sax commented on KAFKA-7224: With [https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced] added to 3.3, so we still want/need this one? > KIP-328: Add spill-to-disk for Suppression > -- > > Key: KAFKA-7224 > URL: https://issues.apache.org/jira/browse/KAFKA-7224 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables] > Following on KAFKA-7223, implement the spill-to-disk buffering strategy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] leeleeian commented on pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata
leeleeian commented on PR #13418: URL: https://github.com/apache/kafka/pull/13418#issuecomment-1477157623 We're discussing the solution for our use case. Closing this PR for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] leeleeian closed pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata
leeleeian closed pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata URL: https://github.com/apache/kafka/pull/13418 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142788313 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -247,6 +248,10 @@ class BrokerServer( ) alterPartitionManager.start() + val addPartitionsLogContext = new LogContext(s"[AddPartitionsToTxnManager broker=${config.brokerId}]") + val networkClient: NetworkClient = NetworkUtils.buildNetworkClient("AddPartitionsManager", config, metrics, time, addPartitionsLogContext) Review Comment: we can make it more specific. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142788144 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue +throw new InvalidRecordException("Received a second request from the same connection without finishing the first.") + } +} +currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) +} +wakeup() + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + inflightNodes.synchronized(inflightNodes.remove(node)) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} wit
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142787356 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue +throw new InvalidRecordException("Received a second request from the same connection without finishing the first.") + } +} +currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) +} +wakeup() + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + inflightNodes.synchronized(inflightNodes.remove(node)) Review Comment: I guess I was considering more than one send thread 😅 I guess we don't have that now. -- This is an automated messag
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142787086 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { Review Comment: I think it's ok to have new data when a request is inflight. The issue is that I have an invariant here that we can only have one queued item for a given txn ID at a time. This is due to how the information is stored in the map. The only time we can receive two requests from the same txn id is when the producer restarts and the epoch is bumped. That is why I have this logic here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142786061 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue Review Comment: I thought about that, but I was concerned about blocking on a single produce request too long. I though maybe the producer's retry mechanism would be enough to handle this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142722842 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +// Check if we have already (either node or individual transaction). +val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node) +currentNodeAndTransactionDataOpt match { + case None => +nodesToTransactions.put(node, + new TransactionDataAndCallbacks(new AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()), +mutable.Map(transactionData.transactionalId() -> callback))) + case Some(currentNodeAndTransactionData) => +// Check if we already have txn ID -- this should only happen in epoch bump case. If so, we should return error for old entry and remove from queue. +val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) +if (currentTransactionData != null) { + if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) { +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), Errors.INVALID_PRODUCER_EPOCH) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) + currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } else { +// We should never see a request on the same epoch since we haven't finished handling the one in queue Review Comment: Would it be possible to have a retry (say first request timed out, and then we send another one) and have more than one request? ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task includes refactoring {{Fetcher}} by extracting out some common logic to allow forthcoming implementations to leverage it. was: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task includes refactoring {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to leverage that common logic. > Extract common logic from Fetcher > - > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out some common > logic to allow forthcoming implementations to leverage it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue opened a new pull request, #13425: KAFKA-14365: Extract common logic from Fetcher
kirktrue opened a new pull request, #13425: URL: https://github.com/apache/kafka/pull/13425 The `Fetcher` class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored `Fetcher`. This task includes refactoring `Fetcher` by extracting out some common logic to allow forthcoming implementations to leverage it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API
guozhangwang commented on code in PR #13380: URL: https://github.com/apache/kafka/pull/13380#discussion_r1142683410 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { } } +private class UnsentOffsetFetchRequest extends RequestState { +public final Set requestedPartitions; +public final GroupState.Generation requestedGeneration; +public CompletableFuture> future; + +public UnsentOffsetFetchRequest(final Set partitions, +final GroupState.Generation generation, +final long retryBackoffMs) { +super(retryBackoffMs); +this.requestedPartitions = partitions; +this.requestedGeneration = generation; +this.future = new CompletableFuture<>(); +} + +public boolean sameRequest(final UnsentOffsetFetchRequest request) { +return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); +} + +public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { +OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( +groupState.groupId, +true, +new ArrayList<>(this.requestedPartitions), +throwOnFetchStableOffsetUnsupported); +NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( +builder, +coordinatorRequestManager.coordinator()); +unsentRequest.future().whenComplete((r, t) -> { +onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); +}); +return unsentRequest; +} + +public void onResponse( +final long currentTimeMs, +final OffsetFetchResponse response) { +Errors responseError = response.groupLevelError(groupState.groupId); +if (responseError != Errors.NONE) { +onFailure(currentTimeMs, responseError); +return; +} +onSuccess(currentTimeMs, response); +} + +private void onFailure(final long currentTimeMs, + final Errors responseError) { +log.debug("Offset fetch failed: {}", responseError.message()); + +// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ? +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +retry(currentTimeMs); +} else if (responseError == Errors.NOT_COORDINATOR) { +// re-discover the coordinator and retry + coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), Time.SYSTEM.milliseconds()); +retry(currentTimeMs); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { + future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId)); +} else { +future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); +} +} + +private void retry(final long currentTimeMs) { +onFailedAttempt(currentTimeMs); +onSendAttempt(currentTimeMs); +pendingRequests.addOffsetFetchRequest(this); +} + +private void onSuccess(final long currentTimeMs, + final OffsetFetchResponse response) { +Set unauthorizedTopics = null; +Map responseData = +response.partitionDataMap(groupState.groupId); +Map offsets = new HashMap<>(responseData.size()); +Set unstableTxnOffsetTopicPartitions = new HashSet<>(); +for (Map.Entry entry : responseData.entrySet()) { +TopicPartition tp = entry.getKey(); +OffsetFetchResponse.PartitionData partitionData = entry.getValue(); +if (partitionData.hasError()) { +Errors error = partitionData.error; +log.debug("Failed to fetch offset for partition {}: {}", tp, error.message()); + +if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { +future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does " + +"not " + +"exist")); +return; +} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { +if (unauthorizedTopics == null) { +unauthorizedTop
[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas
[ https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14829: - Description: Currently, we have various bits of reassignment logic spread across different classes. For example, `ReplicationControlManager` contains logic for when a reassignment is in progress, which is duplication in `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to `PartitionReassignmentReplicas` so its more testable and easier to reason about. (was: Currently, we have various bits of reassignment logic spread across different classes. For example, `ReplicationControlManager` contains logic for when a reassignment is in progress, which is duplication in `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to ) > Consolidate reassignment logic in PartitionReassignmentReplicas > --- > > Key: KAFKA-14829 > URL: https://issues.apache.org/jira/browse/KAFKA-14829 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Minor > > Currently, we have various bits of reassignment logic spread across different > classes. For example, `ReplicationControlManager` contains logic for when a > reassignment is in progress, which is duplication in > `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` > which contains logic for how to undo/revert a reassignment. The idea here is > to move the logic to `PartitionReassignmentReplicas` so its more testable and > easier to reason about. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas
[ https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14829: - Description: Currently, we have various bits of reassignment logic spread across different classes. For example, `ReplicationControlManager` contains logic for when a reassignment is in progress, which is duplication in `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to > Consolidate reassignment logic in PartitionReassignmentReplicas > --- > > Key: KAFKA-14829 > URL: https://issues.apache.org/jira/browse/KAFKA-14829 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Minor > > Currently, we have various bits of reassignment logic spread across different > classes. For example, `ReplicationControlManager` contains logic for when a > reassignment is in progress, which is duplication in > `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` > which contains logic for how to undo/revert a reassignment. The idea here is > to move the logic to -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
artemlivshits commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1142707455 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import java.util.Collections +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { Review Comment: Looks like this could be called from multiple threads, do we need to add synchronization? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
C0urante commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1142700527 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBasedBackingStore { Review Comment: Ah, I forgot about the static constructors for the offset backing store. Yeah, probably not worth it to add another constructor variant for everything. I was thinking about isolating this logic into a `Supplier`, and we could then have something like: ```java public class LogBuilder { public static Supplier> createLog(String topic, String topicConfig, String topicPurpose...) { return () -> { KafkaBasedLog result = ... // create and set up KafkaBasedLog, including performing topic initialization here return result; } } } public class KafkaConfigBackingStore implements ConfigBackingStore { public KafkaConfigBackingStore(Supplier setupAndCreateKafkaBasedLog, ...) } ``` Which I think would still technically work here (and honestly be a bit cleaner than how we're hacking the `setupAndCreateKafkaBasedLog` method in tests right now), but given the number of new constructors it would require (which introduce their own amount of ugliness), we can stick with what's in this PR right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag
C0urante commented on code in PR #13367: URL: https://github.com/apache/kafka/pull/13367#discussion_r1142683836 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -416,6 +417,15 @@ public void testReplicationWithEmptyPartition() throws Exception { @Test public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { +testOneWayReplicationWithOffsetSyncs(OFFSET_LAG_MAX); +} + +@Test +public void testOneWayReplicationWithFrequentOffsetSyncs() throws InterruptedException { +testOneWayReplicationWithOffsetSyncs(0); +} + +public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException { Review Comment: Nit: this may make more sense as `protected` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas
[ https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14829: - Priority: Minor (was: Major) > Consolidate reassignment logic in PartitionReassignmentReplicas > --- > > Key: KAFKA-14829 > URL: https://issues.apache.org/jira/browse/KAFKA-14829 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas
[ https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant reassigned KAFKA-14829: Assignee: Andrew Grant > Consolidate reassignment logic in PartitionReassignmentReplicas > --- > > Key: KAFKA-14829 > URL: https://issues.apache.org/jira/browse/KAFKA-14829 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas
Andrew Grant created KAFKA-14829: Summary: Consolidate reassignment logic in PartitionReassignmentReplicas Key: KAFKA-14829 URL: https://issues.apache.org/jira/browse/KAFKA-14829 Project: Kafka Issue Type: Improvement Reporter: Andrew Grant -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14823) Clean up ConfigProvider API
[ https://issues.apache.org/jira/browse/KAFKA-14823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702932#comment-17702932 ] Greg Harris commented on KAFKA-14823: - Connect currently has an issue where secrets can be changed on-disk, but do not trigger connector and task restarts. Implementing the subscribe/unsubscribe hooks would help alleviate this problem and make rotating secrets for connectors more predictable. > Clean up ConfigProvider API > --- > > Key: KAFKA-14823 > URL: https://issues.apache.org/jira/browse/KAFKA-14823 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Priority: Major > > The ConfigProvider interface exposes several methods that are not used: > - ConfigData get(String path) > - default void subscribe(String path, Set keys, ConfigChangeCallback > callback) > - default void unsubscribe(String path, Set keys, > ConfigChangeCallback callback) > - default void unsubscribeAll() > We should either build mechanisms to support them or deprecate them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors
C0urante commented on PR #13424: URL: https://github.com/apache/kafka/pull/13424#issuecomment-1476936722 @mimaison do you think you'll have time to take a look? Hoping to get this in before the 3.5.0 release but we can punt if there's not enough bandwidth for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors
C0urante commented on PR #13424: URL: https://github.com/apache/kafka/pull/13424#issuecomment-1476935983 cc @yashmayya; feel free to review and/or build off of this for KAFKA-14786, KAFKA-14368, and KAFKA-14784. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Summary: Extract common logic from Fetcher (was: Extract common logic from Fetcher into FetcherUtils) > Extract common logic from Fetcher > - > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out some common > logic into {{FetcherUtils}} to allow forthcoming implementations to leverage > that common logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante opened a new pull request, #13424: KAFKA-14783: New STOPPED state for connectors
C0urante opened a new pull request, #13424: URL: https://github.com/apache/kafka/pull/13424 [Jira](https://issues.apache.org/jira/browse/KAFKA-14783), [relevant KIP section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED) Adds the new `STOPPED` target state for connectors, which causes all tasks for the connector to be shut down and for its status in the REST API to be updated to `STOPPED`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby
guozhangwang commented on code in PR #13369: URL: https://github.com/apache/kafka/pull/13369#discussion_r1142674372 ## streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * An integration test to verify EOS properties when using Caching and Standby replicas + * while tasks are being redistributed after re-balancing event. + * The intent is not that this test should be merged into the repo but only provided for evidence on how to reproduce. + * One test fail and two test pass reliably on an i7-8750H CPU @ 2.20GHz × 12 with 32 GiB Memory + */ +@Category(IntegrationTest.class) +@SuppressWarnings("deprecation") Review Comment: I actually tried it :) The issue is that to replace it we'd need to call `context.forward(key, value)` in which case we need to declare the new processor context with template types `Integer, Integer` to avoid overloaded functions in-distinguishment. But that class would then need to reference the `RecordMetadata` etc, which involves more changes so I stopped there. If people feel strong about that I can try going ahead and bite the bullet to revamp the whole test client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby
guozhangwang commented on code in PR #13369: URL: https://github.com/apache/kafka/pull/13369#discussion_r1142669679 ## streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * An integration test to verify EOS properties when using Caching and Standby replicas + * while tasks are being redistributed after re-balancing event. + * The intent is not that this test should be merged into the repo but only provided for evidence on how to reproduce. Review Comment: Ack, will do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13347: MINOR: Use JUnit-5 extension to enforce strict stubbing
guozhangwang merged PR #13347: URL: https://github.com/apache/kafka/pull/13347 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13347: MINOR: Use JUnit-5 extension to enforce strict stubbing
guozhangwang commented on PR #13347: URL: https://github.com/apache/kafka/pull/13347#issuecomment-1476912355 @lucasbru ack (and sorry for late reply)! Will take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13402: MINOR: Use PartitionAssignment in ReplicationControlManager and PartitionReassignmentReplicas
jsancio merged PR #13402: URL: https://github.com/apache/kafka/pull/13402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14828) Remove R/W lock from StandardAuthorizer
Purshotam Chauhan created KAFKA-14828: - Summary: Remove R/W lock from StandardAuthorizer Key: KAFKA-14828 URL: https://issues.apache.org/jira/browse/KAFKA-14828 Project: Kafka Issue Type: Improvement Reporter: Purshotam Chauhan Assignee: Purshotam Chauhan Currently, StandardAuthorizer uses R/W locks to keep the data consistent between reads. The intent of this Jira is to remove the R/W locks by using the persistent data structures library like - [pcollections|https://github.com/hrldcpr/pcollections], [Paguro|https://github.com/GlenKPeterson/Paguro] and [Vavr|[https://github.com/vavr-io/vavr].] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14828) Remove R/W lock from StandardAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-14828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan updated KAFKA-14828: -- Description: Currently, StandardAuthorizer uses R/W locks to keep the data consistent between reads. The intent of this Jira is to remove the R/W locks by using the persistent data structures library like - [pcollections|https://github.com/hrldcpr/pcollections], [Paguro|https://github.com/GlenKPeterson/Paguro] and [Vavr|https://github.com/vavr-io/vavr] (was: Currently, StandardAuthorizer uses R/W locks to keep the data consistent between reads. The intent of this Jira is to remove the R/W locks by using the persistent data structures library like - [pcollections|https://github.com/hrldcpr/pcollections], [Paguro|https://github.com/GlenKPeterson/Paguro] and [Vavr|[https://github.com/vavr-io/vavr].] ) > Remove R/W lock from StandardAuthorizer > --- > > Key: KAFKA-14828 > URL: https://issues.apache.org/jira/browse/KAFKA-14828 > Project: Kafka > Issue Type: Improvement >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Major > > Currently, StandardAuthorizer uses R/W locks to keep the data consistent > between reads. The intent of this Jira is to remove the R/W locks by using > the persistent data structures library like - > [pcollections|https://github.com/hrldcpr/pcollections], > [Paguro|https://github.com/GlenKPeterson/Paguro] and > [Vavr|https://github.com/vavr-io/vavr] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] emissionnebula opened a new pull request, #13423: KAFKA-14827: Support for StandardAuthorizer benchmark
emissionnebula opened a new pull request, #13423: URL: https://github.com/apache/kafka/pull/13423 What * Renamed AclAuthorizerBenchmark -> AuthorizerBenchmark * Updated it to run benchmarks for both AclAuthorizer and StandardAuthorizer Current benchmark results: ``` Benchmark (aclCount) (authorizerType) (denyPercentage) (resourceCount) Mode CntScore Error Units AclAuthorizerBenchmark.testAclsIterator 50 ACL205 avgt2 165.618 ms/op AclAuthorizerBenchmark.testAclsIterator 50 KRAFT205 avgt2 251.659 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 ACL205 avgt20.005 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 KRAFT205 avgt2 534.053 ms/op AclAuthorizerBenchmark.testAuthorizer 50 ACL205 avgt20.231 ms/op AclAuthorizerBenchmark.testAuthorizer 50 KRAFT205 avgt20.675 ms/op AclAuthorizerBenchmark.testUpdateCache 50 ACL205 avgt2 481.212 ms/op AclAuthorizerBenchmark.testUpdateCache 50 KRAFT205 avgt2 ≈ 10⁻⁶ ms/op JMH benchmarks done ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14827) Support for StandardAuthorizer in Benchmark
Purshotam Chauhan created KAFKA-14827: - Summary: Support for StandardAuthorizer in Benchmark Key: KAFKA-14827 URL: https://issues.apache.org/jira/browse/KAFKA-14827 Project: Kafka Issue Type: Improvement Reporter: Purshotam Chauhan Assignee: Purshotam Chauhan Support for StandardAuthorizer in Benchmark -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby
lucasbru commented on code in PR #13369: URL: https://github.com/apache/kafka/pull/13369#discussion_r1142535965 ## streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * An integration test to verify EOS properties when using Caching and Standby replicas + * while tasks are being redistributed after re-balancing event. + * The intent is not that this test should be merged into the repo but only provided for evidence on how to reproduce. Review Comment: I think we want to remove the last two paragraphs of the comment. ## streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java: ## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.ap
[jira] [Created] (KAFKA-14826) Log the completion time for request that resulted in an error.
José Armando García Sancio created KAFKA-14826: -- Summary: Log the completion time for request that resulted in an error. Key: KAFKA-14826 URL: https://issues.apache.org/jira/browse/KAFKA-14826 Project: Kafka Issue Type: Task Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio When handling fetch request that had an exception the KRaft client doesn't log the "completion" time. There is a FIXME comment in the KafkaRaftClient for this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14825) Handle divergence between KRaft's HWM and Log's HWM
José Armando García Sancio created KAFKA-14825: -- Summary: Handle divergence between KRaft's HWM and Log's HWM Key: KAFKA-14825 URL: https://issues.apache.org/jira/browse/KAFKA-14825 Project: Kafka Issue Type: Task Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio The types in the UnifiedLog and KafkaMetadataLog allow for the log's HWM to diverge from the KRaft client's HWM. In practice this is should not be possible since the KRaft client is the only component that write to the log layer. The code in KafkaMetadataLog has a FIXME comment related to this. We should remove the comment and fix the code to handle divergence in the HWM. There are a least two options: # Change ReplicatedLog::updateHighWatermark to return the actual HWM and deal with the returned value in the KRaft client. # Throw an invalid state exception if the KRaft HWM doesn't match the Log's HWM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14816) Connect loading SSL configs when contacting non-HTTPS URLs
[ https://issues.apache.org/jira/browse/KAFKA-14816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14816. --- Reviewer: Justine Olshan Resolution: Fixed > Connect loading SSL configs when contacting non-HTTPS URLs > -- > > Key: KAFKA-14816 > URL: https://issues.apache.org/jira/browse/KAFKA-14816 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0 >Reporter: Ian McDonald >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.5.0, 3.4.1 > > > Due to changes made here: [https://github.com/apache/kafka/pull/12828] > Connect now unconditionally loads SSL configs from the worker into rest > clients it uses for cross-worker communication and uses them even when > issuing requests to HTTP (i.e., non-HTTPS) URLs. Previously, it would only > attempt to load (and validate) SSL properties when issuing requests to HTTPS > URLs. This can cause issues when a Connect cluster has stopped securing its > REST API with SSL but its worker configs still contain the old (and > now-invalid) SSL properties. When this happens, REST requests that hit a > follower worker but need to be forwarded to the leader will fail, and > connectors that perform dynamic reconfigurations via > [ConnectorContext::requestTaskReconfiguration|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()] > will fail to trigger that reconfiguration if they are not running on the > leader. > In our testing environments - older versions without the linked changes pass > with the following configuration, and newer versions with the changes fail: > {{ssl.keystore.location = /mnt/security/test.keystore.jks}} > {{ssl.keystore.password = [hidden]}} > {{ssl.keystore.type = JKS}} > {{ssl.protocol = TLSv1.2}} > It's important to note that the file {{/mnt/security/test.keystore.jks}} > isn't generated for our non-SSL tests, however these configs are still > included in our worker config file. > This leads to a 500 response when hitting the create connector REST endpoint > with the following error: > bq. { "error_code":500, "message":"Failed to start RestClient: > /mnt/security/test.keystore.jks is not a valid keystore" } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
jolshan commented on PR #13415: URL: https://github.com/apache/kafka/pull/13415#issuecomment-1476575571 cherrypicked to 3.4 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13422: MINOR: Cleanups in clients common.config
mimaison opened a new pull request, #13422: URL: https://github.com/apache/kafka/pull/13422 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1476492254 hello, maybe you are interested in this issue? @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception
[ https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-14824: --- Reviewer: Chia-Ping Tsai > ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown > exception > --- > > Key: KAFKA-14824 > URL: https://issues.apache.org/jira/browse/KAFKA-14824 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: hudeqi >Priority: Blocker > > For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an > unknown exception and the partition fetch is suspended, the paused cleanup > logic of the partition needs to be canceled, otherwise it will lead to > serious unexpected disk usage growth. > > For example, in the actual production environment (the Kafka version used is > 2.5.1), there is such a case: perform log dir balance on this partition > leader broker. After started fetching when the future log is successfully > created, then reset and truncate to the leader's log start offset for the > first time due to out of range. At the same time, because the partition > leader is processing the leaderAndIsrRequest, the leader epoch is updated, so > the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the > 'partitionStates' of the partition are cleaned up. At the same time, the > logic of add ReplicaAlterLogDirsThread for the partition is executing in the > thread that is processing leaderAndIsrRequest. In here, the offset set by > InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread > performs the logic of processFetchRequest, it will throw > "java.lang.IllegalStateException : Offset mismatch for the future replica > anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, > log end offset = 4918576434.", leading to such a result: > ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous > paused cleanup logic of the partition, the disk usage of the corresponding > broker increases infinitely, causing serious problems. > > But I found that trunk fixed this bug in KAFKA-9087, which may cause > ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop > fetch. But I don't know if there will be some other unknown exceptions, and > at the same time, due to the current logic, it will bring the same disk > cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi opened a new pull request, #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…
hudeqi opened a new pull request, #13421: URL: https://github.com/apache/kafka/pull/13421 For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an unknown exception and the partition fetch is suspended, the paused cleanup logic of the partition needs to be canceled, otherwise it will lead to serious unexpected disk usage growth. The detail of the issue is here: [jira](https://issues.apache.org/jira/browse/KAFKA-14824) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception
hudeqi created KAFKA-14824: -- Summary: ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception Key: KAFKA-14824 URL: https://issues.apache.org/jira/browse/KAFKA-14824 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Reporter: hudeqi For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an unknown exception and the partition fetch is suspended, the paused cleanup logic of the partition needs to be canceled, otherwise it will lead to serious unexpected disk usage growth. For example, in the actual production environment (the Kafka version used is 2.5.1), there is such a case: perform log dir balance on this partition leader broker. After started fetching when the future log is successfully created, then reset and truncate to the leader's log start offset for the first time due to out of range. At the same time, because the partition leader is processing the leaderAndIsrRequest, the leader epoch is updated, so the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 'partitionStates' of the partition are cleaned up. At the same time, the logic of add ReplicaAlterLogDirsThread for the partition is executing in the thread that is processing leaderAndIsrRequest. In here, the offset set by InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread performs the logic of processFetchRequest, it will throw "java.lang.IllegalStateException : Offset mismatch for the future replica anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, log end offset = 4918576434.", leading to such a result: ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous paused cleanup logic of the partition, the disk usage of the corresponding broker increases infinitely, causing serious problems. But I found that trunk fixed this bug in KAFKA-9087, which may cause ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop fetch. But I don't know if there will be some other unknown exceptions, and at the same time, due to the current logic, it will bring the same disk cleanup failure problem? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison opened a new pull request, #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911
mimaison opened a new pull request, #13420: URL: https://github.com/apache/kafka/pull/13420 New add.source.alias.to.metrics setting to add the source cluster alias to the MirrorSourceConnector metrics ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
C0urante commented on PR #13415: URL: https://github.com/apache/kafka/pull/13415#issuecomment-1476461304 Thanks Justine! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
jolshan merged PR #13415: URL: https://github.com/apache/kafka/pull/13415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)
mimaison opened a new pull request, #13419: URL: https://github.com/apache/kafka/pull/13419 Add a new configuration replace.null.with.default to allow using null instead of the default value. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
viktorsomogyi commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r114295 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ +public Set createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { Review Comment: Ok, that's fair. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
C0urante commented on code in PR #13415: URL: https://github.com/apache/kafka/pull/13415#discussion_r1142078897 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -97,7 +98,11 @@ public HttpResponse httpRequest(String url, String method, HttpHeaders he public HttpResponse httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData, TypeReference responseFormat, SecretKey sessionKey, String requestSignatureAlgorithm) { -HttpClient client = httpClient(); +// Only try to load SSL configs if we have to (see KAFKA-14816) +SslContextFactory sslContextFactory = url.startsWith("https://";) +? SSLUtils.createClientSideSslContextFactory(config) +: null; +HttpClient client = httpClient(sslContextFactory); Review Comment: We wanted to be conservative about unintended changes in behavior (😞), and weren't certain about the thread safety of the `HttpClient` class. Discussion thread [here](https://github.com/apache/kafka/pull/12828#discussion_r1020382238). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
C0urante commented on code in PR #13415: URL: https://github.com/apache/kafka/pull/13415#discussion_r1142078897 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -97,7 +98,11 @@ public HttpResponse httpRequest(String url, String method, HttpHeaders he public HttpResponse httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData, TypeReference responseFormat, SecretKey sessionKey, String requestSignatureAlgorithm) { -HttpClient client = httpClient(); +// Only try to load SSL configs if we have to (see KAFKA-14816) +SslContextFactory sslContextFactory = url.startsWith("https://";) +? SSLUtils.createClientSideSslContextFactory(config) +: null; +HttpClient client = httpClient(sslContextFactory); Review Comment: We wanted to be conservative about unintended changes in behavior, and weren't certain about the thread safety of the `HttpClient` class. Discussion thread [here](https://github.com/apache/kafka/pull/12828#discussion_r1020382238). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata
C0urante commented on PR #13418: URL: https://github.com/apache/kafka/pull/13418#issuecomment-1476186302 Thanks for providing more detail. I'm still confused about this part: > The default equals method compares by address. But this is not what we want. Wouldn't comparison by address be exactly what you want? In what case(s) would you need a different kind of comparison? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming merged pull request #13403: MINOR:Fix hint about alter in TopicCommand
dengziming merged PR #13403: URL: https://github.com/apache/kafka/pull/13403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14784) Implement connector offset reset REST API
[ https://issues.apache.org/jira/browse/KAFKA-14784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14784: -- Assignee: Yash Mayya > Implement connector offset reset REST API > - > > Key: KAFKA-14784 > URL: https://issues.apache.org/jira/browse/KAFKA-14784 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > Implement the {{DELETE /connectors/name/offsets}} endpoint [described in > KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Resettingoffsets]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14785) Implement connector offset read REST API
[ https://issues.apache.org/jira/browse/KAFKA-14785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-14785: -- Assignee: Yash Mayya > Implement connector offset read REST API > > > Key: KAFKA-14785 > URL: https://issues.apache.org/jira/browse/KAFKA-14785 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > > Implement the {{GET /connector/name/offsets}} endpoint [described in > KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Readingoffsets]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on PR #12992: URL: https://github.com/apache/kafka/pull/12992#issuecomment-1475939063 > Can you reply to the DISCUSS/VOTE threads explaining the changes you made since the KIP was accepted? Sure, just sent an update to the KIP thread: https://lists.apache.org/thread/9dhk6fny30tgnp6z9sf2qmz4c22onmw2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14823) Clean up ConfigProvider API
Mickael Maison created KAFKA-14823: -- Summary: Clean up ConfigProvider API Key: KAFKA-14823 URL: https://issues.apache.org/jira/browse/KAFKA-14823 Project: Kafka Issue Type: Improvement Reporter: Mickael Maison The ConfigProvider interface exposes several methods that are not used: - ConfigData get(String path) - default void subscribe(String path, Set keys, ConfigChangeCallback callback) - default void unsubscribe(String path, Set keys, ConfigChangeCallback callback) - default void unsubscribeAll() We should either build mechanisms to support them or deprecate them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14822) Allow restricting File and Directory ConfigProviders to specific paths
Mickael Maison created KAFKA-14822: -- Summary: Allow restricting File and Directory ConfigProviders to specific paths Key: KAFKA-14822 URL: https://issues.apache.org/jira/browse/KAFKA-14822 Project: Kafka Issue Type: Improvement Reporter: Mickael Maison Assignee: Mickael Maison In sensitive environments, it would be interesting to be able to restrict the files that can be accessed by the built-in configuration providers. For example: config.providers=directory config.providers.directory.class=org.apache.kafka.connect.configs.DirectoryConfigProvider config.providers.directory.path=/var/run Then if a caller tries to access another path, for example ssl.keystore.password=${directory:/etc/passwd:keystore-password} it would be rejected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables
mimaison commented on PR #12992: URL: https://github.com/apache/kafka/pull/12992#issuecomment-1475905586 Can you reply to the DISCUSS/VOTE threads explaining the changes you made since the KIP was accepted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14821) Better handle timeouts in ListOffsets API
Dimitar Dimitrov created KAFKA-14821: Summary: Better handle timeouts in ListOffsets API Key: KAFKA-14821 URL: https://issues.apache.org/jira/browse/KAFKA-14821 Project: Kafka Issue Type: Improvement Components: clients Reporter: Dimitar Dimitrov Assignee: Dimitar Dimitrov The ListOffsets Admin API doesn't retry failed requests for partitions due to timeouts or due to other types of retriable exceptions. This is a step back compared to the Consumer offset APIs implemented in the fetcher as the latter can do partial retries in such cases. * The comparison is notable as some Kafka tools (e.g. {{{}kafka-get-offsets{}}}) have moved from using the Consumer offset APIs to using the ListOffsets Admin API. One nice way to address that seems to be to migrate the ListOffsets API to use the more modern AdminApiDriver mechanism. That should automatically provide the capability to retry requests which responses are deemed retriable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] akatona84 commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
akatona84 commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1141776979 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) { return createOrFindTopics(topics).createdTopics(); } +/** + * Implements a retry logic around creating topic(s) in case it'd fail due to InvalidReplicationFactorException + * + * @param topicDescription + * @param timeoutMs + * @param backOffMs + * @param time + * @return the same as {@link TopicAdmin#createTopics(NewTopic...)} + */ +public Set createTopicsWithRetry(NewTopic topicDescription, long timeoutMs, long backOffMs, Time time) { Review Comment: That retrier is constructed for actually retriable exception and in our case the exception is wrapped in another exception and don't really want to touch/refactor/hack this utility method to handle these kind of exceptions too and adding more not-necessarily retryable exceptions to check in the condition where it retries. This refactor would change the semantics of the retries where it's being used currently. long story short, I feel this topic creation retry is a bit special and wouldn't been used this way in many places later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException
akatona84 commented on code in PR #11565: URL: https://github.com/apache/kafka/pull/11565#discussion_r1141771004 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class KafkaTopicBasedBackingStore { Review Comment: I tried it. in one hand: It'd require the composed field to be static, because of static methods would use the topicInitializer method: https://github.com/apache/kafka/blob/15dfe065fb0ef34145ec204123c592e468842de8/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L129 in the other hand (which relates to the first one a bit): I'm not sure if I could inject this composition nicely, so that I can test it (or mock part of it) easily. Currently the tests are prepared in a way that the `createKafkaBasedLog` is the method of the actual backingstore class, it would need a quite bit of a refactor on the tests too if I want to avoid this inheritance :) wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)
dajac commented on code in PR #13350: URL: https://github.com/apache/kafka/pull/13350#discussion_r1141731445 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -495,8 +615,8 @@ private class ConstrainedAssignmentBuilder extends AbstractAssignmentBuilder { @Override Map> build() { if (log.isDebugEnabled()) { -log.debug("Performing constrained assign with partitionsPerTopic: {}, currentAssignment: {}.", -partitionsPerTopic, currentAssignment); +log.debug("Performing constrained assign with partitionsPerTopic: {}, currentAssignment: {} rackInfo {}.", Review Comment: nit: missing `,` before `rackInfo`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -519,13 +641,14 @@ Map> build() { return assignment; } - // Reassign previously owned partitions, up to the expected number of partitions per consumer private void assignOwnedPartitions() { for (Map.Entry> consumerEntry : currentAssignment.entrySet()) { String consumer = consumerEntry.getKey(); -List ownedPartitions = consumerEntry.getValue(); +List ownedPartitions = consumerEntry.getValue().stream() +.filter(tp -> !rackInfo.racksMismatch(consumer, tp)) Review Comment: For my understanding, if rack awareness is disabled, this is a no-op, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -61,35 +72,51 @@ static final class ConsumerGenerationPair { public static final class MemberData { public final List partitions; public final Optional generation; -public MemberData(List partitions, Optional generation) { +public final Optional rackId; +public MemberData(List partitions, Optional generation, Optional rackId) { this.partitions = partitions; this.generation = generation; +this.rackId = rackId; +} + +public MemberData(List partitions, Optional generation) { +this(partitions, generation, Optional.empty()); } } abstract protected MemberData memberData(Subscription subscription); @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumerToOwnedPartitions = new HashMap<>(); Set partitionsWithMultiplePreviousOwners = new HashSet<>(); + +List allPartitions = new ArrayList<>(); +partitionsPerTopic.values().forEach(allPartitions::addAll); +RackInfo rackInfo = new RackInfo(allPartitions, subscriptions); Review Comment: nit: There is an extra space. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -574,6 +697,42 @@ private void assignOwnedPartitions() { } } +// Round-Robin filling within racks for remaining members up to the expected numbers of maxQuota, +// otherwise, to minQuota +private void assignRackAwareRoundRobin(List unassignedPartitions) { +int nextUnfilledConsumerIndex = 0; +Iterator unassignedIter = unassignedPartitions.iterator(); +while (!rackInfo.consumerRacks.isEmpty() && unassignedIter.hasNext()) { Review Comment: nit: Do we ever mutate `ackInfo.consumerRacks`? If not, would it make sense to put this check as first statement in the method and return if is it empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs
yashmayya commented on code in PR #13415: URL: https://github.com/apache/kafka/pull/13415#discussion_r1141704800 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -97,7 +98,11 @@ public HttpResponse httpRequest(String url, String method, HttpHeaders he public HttpResponse httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData, TypeReference responseFormat, SecretKey sessionKey, String requestSignatureAlgorithm) { -HttpClient client = httpClient(); +// Only try to load SSL configs if we have to (see KAFKA-14816) +SslContextFactory sslContextFactory = url.startsWith("https://";) +? SSLUtils.createClientSideSslContextFactory(config) +: null; +HttpClient client = httpClient(sslContextFactory); Review Comment: Not related to this change, but any idea why we're creating and starting an HTTP client for every single request here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org