[GitHub] [kafka] showuon merged pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata

2023-03-20 Thread via GitHub


showuon merged PR #13362:
URL: https://github.com/apache/kafka/pull/13362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata

2023-03-20 Thread via GitHub


showuon commented on PR #13362:
URL: https://github.com/apache/kafka/pull/13362#issuecomment-1477344434

   Failed tests are unrelated
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…

2023-03-20 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1477340172

   > Why we need this PR if 
[KAFKA-9087](https://issues.apache.org/jira/browse/KAFKA-9087) had fixed the 
bug ( you mentioned in the jira)? Is there another potential bug? Or the bug 
fixed by [KAFKA-9087](https://issues.apache.org/jira/browse/KAFKA-9087) is not 
root cause?
   
   KAFKA-9087 solves the root cause of stopping fetch due to an "Offset 
mismatch" error thrown during "processPartitionData", but I thought about it, 
and there may be other potential exceptions here (although I haven't found it 
yet, it may be potential), It will also lead to the final result: the fetch 
stops and the log is not cleaned up, and finally the disk usage grows 
infinitely. To be more precise, this pr is a defensive measure, as I understand 
it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…

2023-03-20 Thread via GitHub


chia7712 commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1477334683

   Why we need this PR if KAFKA-9087 had fixed the bug ( you mentioned in the 
jira)? Is there another potential bug? Or the bug fixed by KAFKA-9087 is not 
root cause?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rohits64 commented on pull request #13361: KAFKA-14401: Resume WorkThread if Connector/Tasks reading offsets get stuck when underneath WorkThread dies [WIP]

2023-03-20 Thread via GitHub


rohits64 commented on PR #13361:
URL: https://github.com/apache/kafka/pull/13361#issuecomment-1477318855

   Hi @mukkachaitanya, thanks for the inputs. I think it we can go with 
exponential backoff with some static number of retries. I have made the 
changes. Currently I have made the number of retry as 10. That should be 
sufficient number of retries.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13362: KAFKA-14795: Provide message formatter for RemoteLogMetadata

2023-03-20 Thread via GitHub


satishd commented on PR #13362:
URL: https://github.com/apache/kafka/pull/13362#issuecomment-1477226052

   >LGTM, the only comment is should we make the formatter as a separate file? 
@satishd , thoughts?
   
   @showuon I am fine with formatter inside the serde class. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic

2023-03-20 Thread via GitHub


mjsax merged PR #13365:
URL: https://github.com/apache/kafka/pull/13365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13365: KAFKA-14491: [17/N] Refactor segments cleanup logic

2023-03-20 Thread via GitHub


mjsax commented on code in PR #13365:
URL: https://github.com/apache/kafka/pull/13365#discussion_r1142849436


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##
@@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, 
final long streamTime)
 physicalStore.openDB(context.appConfigs(), context.stateDir());
 }
 
+@Override
+public void cleanupExpiredSegments(final long streamTime) {
+super.cleanupExpiredSegments(streamTime);

Review Comment:
   Ah. I see. Guess it would get clear quickly in an IDE -- just hard to see on 
GitHub. Seems fine w/o a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-20 Thread via GitHub


mjsax commented on PR #13340:
URL: https://github.com/apache/kafka/pull/13340#issuecomment-1477214689

   Build failed with checkstyle errors:
   ```
   [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-13340/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:344:13:
 'method call rparen' has incorrect indentation level 12, expected level should 
be 8. [Indentation]
   
   [2023-03-07T22:44:54.193Z] [ant:checkstyle] [ERROR] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-13340/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:355:13:
 Variable 'numRecordsProduced' should be declared final. [FinalLocalVariable]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-20 Thread via GitHub


mjsax commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848310


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -334,18 +399,65 @@ private final int produceSourceData(final long timestamp,
 }
 
 /**
- * Test-only processor for inserting records into a versioned store while 
also tracking
- * them separately in-memory, and performing checks to validate expected 
store contents.
- * Forwards the number of failed checks downstream for consumption.
+ * @param topic   topic to produce to
+ * @param dataTracker map of key -> timestamp -> value for tracking data 
which is produced to
+ *the topic. This method will add the produced data 
into this in-memory
+ *tracker in addition to producing to the topic, in 
order to keep the two
+ *in sync.
+ * @param timestamp   timestamp to produce with
+ * @param keyValues   key-value pairs to produce
+ *
+ * @return number of records produced
+ */
+@SuppressWarnings("varargs")
+@SafeVarargs
+private final int produceDataToTopic(final String topic,
+ final DataTracker dataTracker,
+ final long timestamp,
+ final KeyValue... 
keyValues) {
+produceDataToTopic(topic, timestamp, keyValues);
+
+for (final KeyValue keyValue : keyValues) {
+dataTracker.add(keyValue.key, timestamp, keyValue.value);
+}
+
+return keyValues.length;
+}
+
+/**
+ * Test-only processor for validating expected contents of a versioned 
store, and forwards
+ * the number of failed checks downstream for consumption. Callers specify 
whether the
+ * processor should also be responsible for inserting records into the 
store (while also
+ * tracking them separately in-memory for use in validation).
  */
 private static class VersionedStoreContentCheckerProcessor implements 
Processor {
 
 private ProcessorContext context;
 private VersionedKeyValueStore store;
 
+// whether or not the processor should write records to the store as 
they arrive.
+// must be false for global stores.

Review Comment:
   Know that I understand how the test actually works, it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13340: KAFKA-14491: [15/N] Add integration tests for versioned stores

2023-03-20 Thread via GitHub


mjsax commented on code in PR #13340:
URL: https://github.com/apache/kafka/pull/13340#discussion_r1142848103


##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
 .withPartitions(Collections.singleton(0));
 final StateQueryResult result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+}
+
+@Test
+public void shouldCreateGlobalTable() throws Exception {
+// produce data to global store topic and track in-memory for 
processor to verify
+final DataTracker data = new DataTracker();
+produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 5, 
KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3, "c5"));
+produceDataToTopic(globalTableTopic, data, baseTimestamp + 2, 
KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, null)); // 
out-of-order data
+
+// build topology and start app
+final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+streamsBuilder
+.globalTable(
+globalTableTopic,
+Consumed.with(Serdes.Integer(), Serdes.String()),
+Materialized
+.as(new 
RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, HISTORY_RETENTION))
+.withKeySerde(Serdes.Integer())
+.withValueSerde(Serdes.String())
+);
+streamsBuilder
+.stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.process(() -> new VersionedStoreContentCheckerProcessor(false, 
data))
+.to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
+
+final Properties props = props();
+kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+kafkaStreams.start();
+
+// produce source data to trigger store verifications in processor
+int numRecordsProduced = produceDataToTopic(inputStream, baseTimestamp 
+ 8, KeyValue.pair(1, "a8"), KeyValue.pair(2, "b8"), KeyValue.pair(3, "c8"));
+
+// wait for output and verify
+final List> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+TestUtils.consumerConfig(
+CLUSTER.bootstrapServers(),
+IntegerDeserializer.class,
+IntegerDeserializer.class),
+outputStream,
+numRecordsProduced);
+
+for (final KeyValue receivedRecord : 
receivedRecords) {
+// verify zero failed checks for each record
+assertThat(receivedRecord.value, equalTo(0));

Review Comment:
   I was referring to this comment: 
https://github.com/apache/kafka/pull/13340#discussion_r1128550162
   
   > and we will not be able to write to a global store from the processor
   
   If you specify a global-store, we pass in the "global processor" that is 
able to write into the store (well, has to do this, to maintain the global 
store), and thus, we can easily track what goes into the store "on the side" is 
an in-memory data structure similar to what we do for a regular processor that 
maintains the store.
   
   > This test already has a processor which inspects/validates the contents of 
the global store. Have I misunderstood?
   
   I think I did not understand how the test works -- not I see that you use a 
regular processor to read the global state store to verify the content. So I 
guess my comment is void (I did basically propose to add this via a "global 
processor").



##
streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java:
##
@@ -302,7 +319,54 @@ public void 
shouldAllowCustomIQv2ForCustomStoreImplementations() {
 .withPartitions(Collections.singleton(0));
 final StateQueryResult result =
 IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
-assertThat("success", 
equalTo(result.getOnlyPartitionResult().getResult()));
+assertThat(result.getOnlyPartitionResult().getResult(), 
equalTo("success"));
+}
+
+@Test
+public void shouldCreateGlobalTable() throws Exception {
+// produce data to global store topic and track in-memory for 
processor to verify
+final DataTracker data = new DataTracker();
+produceDataToTopic(globalTableTopic, data, baseTimestamp, 
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, null));
+produceDataToTopic(globalTableTopi

[GitHub] [kafka] mjsax commented on a diff in pull request #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-03-20 Thread via GitHub


mjsax commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1142842326


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
 private  InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory factory,

   final String name) {
-if (factory.isWindowStore()) {
+if (factory.isVersionedStore()) {
+final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   >  It's not strictly necessary though.
   
   If not necessary, no need to do anything. Just wanted to probe if we need to 
do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7224) KIP-328: Add spill-to-disk for Suppression

2023-03-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702980#comment-17702980
 ] 

Matthias J. Sax commented on KAFKA-7224:


With 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced]
 added to 3.3, so we still want/need this one?

> KIP-328: Add spill-to-disk for Suppression
> --
>
> Key: KAFKA-7224
> URL: https://issues.apache.org/jira/browse/KAFKA-7224
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
> Following on KAFKA-7223, implement the spill-to-disk buffering strategy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] leeleeian commented on pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata

2023-03-20 Thread via GitHub


leeleeian commented on PR #13418:
URL: https://github.com/apache/kafka/pull/13418#issuecomment-1477157623

   We're discussing the solution for our use case.
   
   Closing this PR for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] leeleeian closed pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata

2023-03-20 Thread via GitHub


leeleeian closed pull request #13418: MINOR: add equals and hashcode methods to 
KafkaProducer and ProducerMetadata
URL: https://github.com/apache/kafka/pull/13418


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142788313


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -247,6 +248,10 @@ class BrokerServer(
   )
   alterPartitionManager.start()
 
+  val addPartitionsLogContext = new 
LogContext(s"[AddPartitionsToTxnManager broker=${config.brokerId}]")
+  val networkClient: NetworkClient = 
NetworkUtils.buildNetworkClient("AddPartitionsManager", config, metrics, time, 
addPartitionsLogContext)

Review Comment:
   we can make it more specific.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142788144


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue
+throw new InvalidRecordException("Received a second request from 
the same connection without finishing the first.")
+  }
+}
+currentNodeAndTransactionData.transactionData.add(transactionData)
+
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+}
+wakeup()
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  inflightNodes.synchronized(inflightNodes.remove(node))
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
wit

[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142787356


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue
+throw new InvalidRecordException("Received a second request from 
the same connection without finishing the first.")
+  }
+}
+currentNodeAndTransactionData.transactionData.add(transactionData)
+
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+}
+wakeup()
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  inflightNodes.synchronized(inflightNodes.remove(node))

Review Comment:
   I guess I was considering more than one send thread 😅 I guess we don't have 
that now.



-- 
This is an automated messag

[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142787086


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {

Review Comment:
   I think it's ok to have new data when a request is inflight. 
   The issue is that I have an invariant here that we can only have one queued 
item for a given txn ID at a time. This is due to how the information is stored 
in the map. 
   
   The only time we can receive two requests from the same txn id is when the 
producer restarts and the epoch is bumped. That is why I have this logic here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142786061


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue

Review Comment:
   I thought about that, but I was concerned about blocking on a single produce 
request too long. I though maybe the producer's retry mechanism would be enough 
to handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142722842


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+// Check if we have already (either node or individual transaction). 
+val currentNodeAndTransactionDataOpt = nodesToTransactions.get(node)
+currentNodeAndTransactionDataOpt match {
+  case None =>
+nodesToTransactions.put(node,
+  new TransactionDataAndCallbacks(new 
AddPartitionsToTxnTransactionCollection(Collections.singletonList(transactionData).iterator()),
+mutable.Map(transactionData.transactionalId() -> callback)))
+  case Some(currentNodeAndTransactionData) =>
+// Check if we already have txn ID -- this should only happen in epoch 
bump case. If so, we should return error for old entry and remove from queue.
+val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+if (currentTransactionData != null) {
+  if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch()) {
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic => 
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), Errors.INVALID_PRODUCER_EPOCH)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+
currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  } else {
+// We should never see a request on the same epoch since we 
haven't finished handling the one in queue

Review Comment:
Would it be possible to have a retry (say first request timed out, and then 
we send another one) and have more than one request?



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by 

[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher

2023-03-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task includes refactoring {{Fetcher}} by extracting out some common logic 
to allow forthcoming implementations to leverage it.

  was:
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task includes refactoring {{Fetcher}} by extracting out some common logic 
into {{FetcherUtils}} to allow forthcoming implementations to leverage that 
common logic.


> Extract common logic from Fetcher
> -
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task includes refactoring {{Fetcher}} by extracting out some common 
> logic to allow forthcoming implementations to leverage it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue opened a new pull request, #13425: KAFKA-14365: Extract common logic from Fetcher

2023-03-20 Thread via GitHub


kirktrue opened a new pull request, #13425:
URL: https://github.com/apache/kafka/pull/13425

   The `Fetcher` class is used internally by the `KafkaConsumer` to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored `Fetcher`.
   
   This task includes refactoring `Fetcher` by extracting out some common logic 
to allow forthcoming implementations to leverage it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13380: KAFKA-14468: Committed API

2023-03-20 Thread via GitHub


guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1142683410


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -207,6 +226,232 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
 }
 }
 
+private class UnsentOffsetFetchRequest extends RequestState {
+public final Set requestedPartitions;
+public final GroupState.Generation requestedGeneration;
+public CompletableFuture> 
future;
+
+public UnsentOffsetFetchRequest(final Set partitions,
+final GroupState.Generation generation,
+final long retryBackoffMs) {
+super(retryBackoffMs);
+this.requestedPartitions = partitions;
+this.requestedGeneration = generation;
+this.future = new CompletableFuture<>();
+}
+
+public boolean sameRequest(final UnsentOffsetFetchRequest request) {
+return Objects.equals(requestedGeneration, 
request.requestedGeneration) && 
requestedPartitions.equals(request.requestedPartitions);
+}
+
+public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long 
currentTimeMs) {
+OffsetFetchRequest.Builder builder = new 
OffsetFetchRequest.Builder(
+groupState.groupId,
+true,
+new ArrayList<>(this.requestedPartitions),
+throwOnFetchStableOffsetUnsupported);
+NetworkClientDelegate.UnsentRequest unsentRequest = new 
NetworkClientDelegate.UnsentRequest(
+builder,
+coordinatorRequestManager.coordinator());
+unsentRequest.future().whenComplete((r, t) -> {
+onResponse(currentTimeMs, (OffsetFetchResponse) 
r.responseBody());
+});
+return unsentRequest;
+}
+
+public void onResponse(
+final long currentTimeMs,
+final OffsetFetchResponse response) {
+Errors responseError = 
response.groupLevelError(groupState.groupId);
+if (responseError != Errors.NONE) {
+onFailure(currentTimeMs, responseError);
+return;
+}
+onSuccess(currentTimeMs, response);
+}
+
+private void onFailure(final long currentTimeMs,
+   final Errors responseError) {
+log.debug("Offset fetch failed: {}", responseError.message());
+
+// TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
+if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
+// re-discover the coordinator and retry
+
coordinatorRequestManager.markCoordinatorUnknown(responseError.message(), 
Time.SYSTEM.milliseconds());
+retry(currentTimeMs);
+} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupState.groupId));
+} else {
+future.completeExceptionally(new KafkaException("Unexpected 
error in fetch offset response: " + responseError.message()));
+}
+}
+
+private void retry(final long currentTimeMs) {
+onFailedAttempt(currentTimeMs);
+onSendAttempt(currentTimeMs);
+pendingRequests.addOffsetFetchRequest(this);
+}
+
+private void onSuccess(final long currentTimeMs,
+   final OffsetFetchResponse response) {
+Set unauthorizedTopics = null;
+Map 
responseData =
+response.partitionDataMap(groupState.groupId);
+Map offsets = new 
HashMap<>(responseData.size());
+Set unstableTxnOffsetTopicPartitions = new 
HashSet<>();
+for (Map.Entry 
entry : responseData.entrySet()) {
+TopicPartition tp = entry.getKey();
+OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
+if (partitionData.hasError()) {
+Errors error = partitionData.error;
+log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
+
+if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+future.completeExceptionally(new KafkaException("Topic 
or Partition " + tp + " does " +
+"not " +
+"exist"));
+return;
+} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+if (unauthorizedTopics == null) {
+unauthorizedTop

[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas

2023-03-20 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14829:
-
Description: Currently, we have various bits of reassignment logic spread 
across different classes. For example, `ReplicationControlManager` contains 
logic for when a reassignment is in progress, which is duplication in 
`PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` 
which contains logic for how to undo/revert a reassignment. The idea here is to 
move the logic to `PartitionReassignmentReplicas` so its more testable and 
easier to reason about.  (was: Currently, we have various bits of reassignment 
logic spread across different classes. For example, `ReplicationControlManager` 
contains logic for when a reassignment is in progress, which is duplication in 
`PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` 
which contains logic for how to undo/revert a reassignment. The idea here is to 
move the logic to )

> Consolidate reassignment logic in PartitionReassignmentReplicas
> ---
>
> Key: KAFKA-14829
> URL: https://issues.apache.org/jira/browse/KAFKA-14829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
>
> Currently, we have various bits of reassignment logic spread across different 
> classes. For example, `ReplicationControlManager` contains logic for when a 
> reassignment is in progress, which is duplication in 
> `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` 
> which contains logic for how to undo/revert a reassignment. The idea here is 
> to move the logic to `PartitionReassignmentReplicas` so its more testable and 
> easier to reason about.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas

2023-03-20 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14829:
-
Description: Currently, we have various bits of reassignment logic spread 
across different classes. For example, `ReplicationControlManager` contains 
logic for when a reassignment is in progress, which is duplication in 
`PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` 
which contains logic for how to undo/revert a reassignment. The idea here is to 
move the logic to 

> Consolidate reassignment logic in PartitionReassignmentReplicas
> ---
>
> Key: KAFKA-14829
> URL: https://issues.apache.org/jira/browse/KAFKA-14829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
>
> Currently, we have various bits of reassignment logic spread across different 
> classes. For example, `ReplicationControlManager` contains logic for when a 
> reassignment is in progress, which is duplication in 
> `PartitionChangeBuilder`. Another example is `PartitionReassignmentRevert` 
> which contains logic for how to undo/revert a reassignment. The idea here is 
> to move the logic to 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] artemlivshits commented on a diff in pull request #13391: WIP: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-03-20 Thread via GitHub


artemlivshits commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1142707455


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{InvalidRecordException, Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import java.util.Collections
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {

Review Comment:
   Looks like this could be called from multiple threads, do we need to add 
synchronization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-20 Thread via GitHub


C0urante commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1142700527


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public abstract class KafkaTopicBasedBackingStore {

Review Comment:
   Ah, I forgot about the static constructors for the offset backing store. 
Yeah, probably not worth it to add another constructor variant for everything.
   
   I was thinking about isolating this logic into a `Supplier`, 
and we could then have something like:
   
   ```java
   public class LogBuilder {
 public static Supplier> createLog(String topic, String 
topicConfig, String topicPurpose...) {
   return () -> {
 KafkaBasedLog result = ...
 // create and set up KafkaBasedLog, including performing topic 
initialization here
 return result;
   }
 }
   }
   
   public class KafkaConfigBackingStore implements ConfigBackingStore {
 public KafkaConfigBackingStore(Supplier 
setupAndCreateKafkaBasedLog, ...)
   }
   ```
   
   Which I think would still technically work here (and honestly be a bit 
cleaner than how we're hacking the `setupAndCreateKafkaBasedLog` method in 
tests right now), but given the number of new constructors it would require 
(which introduce their own amount of ugliness), we can stick with what's in 
this PR right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13367: KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag

2023-03-20 Thread via GitHub


C0urante commented on code in PR #13367:
URL: https://github.com/apache/kafka/pull/13367#discussion_r1142683836


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -416,6 +417,15 @@ public void testReplicationWithEmptyPartition() throws 
Exception {
 
 @Test
 public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {
+testOneWayReplicationWithOffsetSyncs(OFFSET_LAG_MAX);
+}
+
+@Test
+public void testOneWayReplicationWithFrequentOffsetSyncs() throws 
InterruptedException {
+testOneWayReplicationWithOffsetSyncs(0);
+}
+
+public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws 
InterruptedException {

Review Comment:
   Nit: this may make more sense as `protected`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas

2023-03-20 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14829:
-
Priority: Minor  (was: Major)

> Consolidate reassignment logic in PartitionReassignmentReplicas
> ---
>
> Key: KAFKA-14829
> URL: https://issues.apache.org/jira/browse/KAFKA-14829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas

2023-03-20 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant reassigned KAFKA-14829:


Assignee: Andrew Grant

> Consolidate reassignment logic in PartitionReassignmentReplicas
> ---
>
> Key: KAFKA-14829
> URL: https://issues.apache.org/jira/browse/KAFKA-14829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14829) Consolidate reassignment logic in PartitionReassignmentReplicas

2023-03-20 Thread Andrew Grant (Jira)
Andrew Grant created KAFKA-14829:


 Summary: Consolidate reassignment logic in 
PartitionReassignmentReplicas
 Key: KAFKA-14829
 URL: https://issues.apache.org/jira/browse/KAFKA-14829
 Project: Kafka
  Issue Type: Improvement
Reporter: Andrew Grant






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14823) Clean up ConfigProvider API

2023-03-20 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702932#comment-17702932
 ] 

Greg Harris commented on KAFKA-14823:
-

Connect currently has an issue where secrets can be changed on-disk, but do not 
trigger connector and task restarts. Implementing the subscribe/unsubscribe 
hooks would help alleviate this problem and make rotating secrets for 
connectors more predictable.

> Clean up ConfigProvider API
> ---
>
> Key: KAFKA-14823
> URL: https://issues.apache.org/jira/browse/KAFKA-14823
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Priority: Major
>
> The ConfigProvider interface exposes several methods that are not used:
> - ConfigData get(String path)
> - default void subscribe(String path, Set keys, ConfigChangeCallback 
> callback)
> - default void unsubscribe(String path, Set keys, 
> ConfigChangeCallback callback)
> - default void unsubscribeAll()
> We should either build mechanisms to support them or deprecate them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

2023-03-20 Thread via GitHub


C0urante commented on PR #13424:
URL: https://github.com/apache/kafka/pull/13424#issuecomment-1476936722

   @mimaison do you think you'll have time to take a look? Hoping to get this 
in before the 3.5.0 release but we can punt if there's not enough bandwidth for 
review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

2023-03-20 Thread via GitHub


C0urante commented on PR #13424:
URL: https://github.com/apache/kafka/pull/13424#issuecomment-1476935983

   cc @yashmayya; feel free to review and/or build off of this for KAFKA-14786, 
KAFKA-14368, and KAFKA-14784.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher

2023-03-20 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Summary: Extract common logic from Fetcher  (was: Extract common logic from 
Fetcher into FetcherUtils)

> Extract common logic from Fetcher
> -
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task includes refactoring {{Fetcher}} by extracting out some common 
> logic into {{FetcherUtils}} to allow forthcoming implementations to leverage 
> that common logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante opened a new pull request, #13424: KAFKA-14783: New STOPPED state for connectors

2023-03-20 Thread via GitHub


C0urante opened a new pull request, #13424:
URL: https://github.com/apache/kafka/pull/13424

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14783), [relevant KIP 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED)
   
   Adds the new `STOPPED` target state for connectors, which causes all tasks 
for the connector to be shut down and for its status in the REST API to be 
updated to `STOPPED`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby

2023-03-20 Thread via GitHub


guozhangwang commented on code in PR #13369:
URL: https://github.com/apache/kafka/pull/13369#discussion_r1142674372


##
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * An integration test to verify EOS properties when using Caching and Standby 
replicas
+ * while tasks are being redistributed after re-balancing event.
+ * The intent is not that this test should be merged into the repo but only 
provided for evidence on how to reproduce.
+ * One test fail and two test pass reliably on an i7-8750H CPU @ 2.20GHz × 12 
with 32 GiB Memory
+ */
+@Category(IntegrationTest.class)
+@SuppressWarnings("deprecation")

Review Comment:
   I actually tried it :) The issue is that to replace it we'd need to call 
`context.forward(key, value)` in which case we need to declare the new 
processor context with template types `Integer, Integer` to avoid overloaded 
functions in-distinguishment. But that class would then need to reference the 
`RecordMetadata` etc, which involves more changes so I stopped there.
   
   If people feel strong about that I can try going ahead and bite the bullet 
to revamp the whole test client. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby

2023-03-20 Thread via GitHub


guozhangwang commented on code in PR #13369:
URL: https://github.com/apache/kafka/pull/13369#discussion_r1142669679


##
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * An integration test to verify EOS properties when using Caching and Standby 
replicas
+ * while tasks are being redistributed after re-balancing event.
+ * The intent is not that this test should be merged into the repo but only 
provided for evidence on how to reproduce.

Review Comment:
   Ack, will do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13347: MINOR: Use JUnit-5 extension to enforce strict stubbing

2023-03-20 Thread via GitHub


guozhangwang merged PR #13347:
URL: https://github.com/apache/kafka/pull/13347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13347: MINOR: Use JUnit-5 extension to enforce strict stubbing

2023-03-20 Thread via GitHub


guozhangwang commented on PR #13347:
URL: https://github.com/apache/kafka/pull/13347#issuecomment-1476912355

   @lucasbru ack (and sorry for late reply)! Will take a look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13402: MINOR: Use PartitionAssignment in ReplicationControlManager and PartitionReassignmentReplicas

2023-03-20 Thread via GitHub


jsancio merged PR #13402:
URL: https://github.com/apache/kafka/pull/13402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14828) Remove R/W lock from StandardAuthorizer

2023-03-20 Thread Purshotam Chauhan (Jira)
Purshotam Chauhan created KAFKA-14828:
-

 Summary: Remove R/W lock from StandardAuthorizer
 Key: KAFKA-14828
 URL: https://issues.apache.org/jira/browse/KAFKA-14828
 Project: Kafka
  Issue Type: Improvement
Reporter: Purshotam Chauhan
Assignee: Purshotam Chauhan


Currently, StandardAuthorizer uses R/W locks to keep the data consistent 
between reads. The intent of this Jira is to remove the R/W locks by using the 
persistent data structures library like - 
[pcollections|https://github.com/hrldcpr/pcollections], 
[Paguro|https://github.com/GlenKPeterson/Paguro] and 
[Vavr|[https://github.com/vavr-io/vavr].] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14828) Remove R/W lock from StandardAuthorizer

2023-03-20 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan updated KAFKA-14828:
--
Description: Currently, StandardAuthorizer uses R/W locks to keep the data 
consistent between reads. The intent of this Jira is to remove the R/W locks by 
using the persistent data structures library like - 
[pcollections|https://github.com/hrldcpr/pcollections], 
[Paguro|https://github.com/GlenKPeterson/Paguro] and 
[Vavr|https://github.com/vavr-io/vavr]   (was: Currently, StandardAuthorizer 
uses R/W locks to keep the data consistent between reads. The intent of this 
Jira is to remove the R/W locks by using the persistent data structures library 
like - [pcollections|https://github.com/hrldcpr/pcollections], 
[Paguro|https://github.com/GlenKPeterson/Paguro] and 
[Vavr|[https://github.com/vavr-io/vavr].] )

> Remove R/W lock from StandardAuthorizer
> ---
>
> Key: KAFKA-14828
> URL: https://issues.apache.org/jira/browse/KAFKA-14828
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Major
>
> Currently, StandardAuthorizer uses R/W locks to keep the data consistent 
> between reads. The intent of this Jira is to remove the R/W locks by using 
> the persistent data structures library like - 
> [pcollections|https://github.com/hrldcpr/pcollections], 
> [Paguro|https://github.com/GlenKPeterson/Paguro] and 
> [Vavr|https://github.com/vavr-io/vavr] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] emissionnebula opened a new pull request, #13423: KAFKA-14827: Support for StandardAuthorizer benchmark

2023-03-20 Thread via GitHub


emissionnebula opened a new pull request, #13423:
URL: https://github.com/apache/kafka/pull/13423

   What
   
   * Renamed AclAuthorizerBenchmark -> AuthorizerBenchmark
   * Updated it to run benchmarks for both AclAuthorizer and StandardAuthorizer
   
   Current benchmark results: 
   
   ```
   Benchmark   (aclCount)  
(authorizerType)  (denyPercentage)  (resourceCount)  Mode  CntScore   Error 
 Units
   AclAuthorizerBenchmark.testAclsIterator 50   
ACL205  avgt2  165.618  ms/op
   AclAuthorizerBenchmark.testAclsIterator 50 
KRAFT205  avgt2  251.659  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50   
ACL205  avgt20.005  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50 
KRAFT205  avgt2  534.053  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50   
ACL205  avgt20.231  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50 
KRAFT205  avgt20.675  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50   
ACL205  avgt2  481.212  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50 
KRAFT205  avgt2   ≈ 10⁻⁶  ms/op
   JMH benchmarks done
   ```
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14827) Support for StandardAuthorizer in Benchmark

2023-03-20 Thread Purshotam Chauhan (Jira)
Purshotam Chauhan created KAFKA-14827:
-

 Summary: Support for StandardAuthorizer in Benchmark
 Key: KAFKA-14827
 URL: https://issues.apache.org/jira/browse/KAFKA-14827
 Project: Kafka
  Issue Type: Improvement
Reporter: Purshotam Chauhan
Assignee: Purshotam Chauhan


Support for StandardAuthorizer in Benchmark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on a diff in pull request #13369: KAFKA-14172: Should clear cache when active recycled from standby

2023-03-20 Thread via GitHub


lucasbru commented on code in PR #13369:
URL: https://github.com/apache/kafka/pull/13369#discussion_r1142535965


##
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * An integration test to verify EOS properties when using Caching and Standby 
replicas
+ * while tasks are being redistributed after re-balancing event.
+ * The intent is not that this test should be merged into the repo but only 
provided for evidence on how to reproduce.

Review Comment:
   I think we want to remove the last two paragraphs of the comment. 



##
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java:
##
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.ap

[jira] [Created] (KAFKA-14826) Log the completion time for request that resulted in an error.

2023-03-20 Thread Jira
José Armando García Sancio created KAFKA-14826:
--

 Summary: Log the completion time for request that resulted in an 
error.
 Key: KAFKA-14826
 URL: https://issues.apache.org/jira/browse/KAFKA-14826
 Project: Kafka
  Issue Type: Task
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


When handling fetch request that had an exception the KRaft client doesn't log 
the "completion" time. There is a FIXME comment in the KafkaRaftClient for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14825) Handle divergence between KRaft's HWM and Log's HWM

2023-03-20 Thread Jira
José Armando García Sancio created KAFKA-14825:
--

 Summary: Handle divergence between KRaft's HWM and Log's HWM 
 Key: KAFKA-14825
 URL: https://issues.apache.org/jira/browse/KAFKA-14825
 Project: Kafka
  Issue Type: Task
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


The types in the UnifiedLog and KafkaMetadataLog allow for the log's HWM to 
diverge from the KRaft client's HWM. In practice this is should not be possible 
since the KRaft client is the only component that write to the log layer.

The code in KafkaMetadataLog has a FIXME comment related to this. We should 
remove the comment and fix the code to handle divergence in the HWM.

There are a least two options:
 # Change ReplicatedLog::updateHighWatermark to return the actual HWM and deal 
with the returned value in the KRaft client.
 # Throw an invalid state exception if the KRaft HWM doesn't match the Log's 
HWM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14816) Connect loading SSL configs when contacting non-HTTPS URLs

2023-03-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14816.
---
  Reviewer: Justine Olshan
Resolution: Fixed

> Connect loading SSL configs when contacting non-HTTPS URLs
> --
>
> Key: KAFKA-14816
> URL: https://issues.apache.org/jira/browse/KAFKA-14816
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0
>Reporter: Ian McDonald
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1
>
>
> Due to changes made here: [https://github.com/apache/kafka/pull/12828]
> Connect now unconditionally loads SSL configs from the worker into rest 
> clients it uses for cross-worker communication and uses them even when 
> issuing requests to HTTP (i.e., non-HTTPS) URLs. Previously, it would only 
> attempt to load (and validate) SSL properties when issuing requests to HTTPS 
> URLs. This can cause issues when a Connect cluster has stopped securing its 
> REST API with SSL but its worker configs still contain the old (and 
> now-invalid) SSL properties. When this happens, REST requests that hit a 
> follower worker but need to be forwarded to the leader will fail, and 
> connectors that perform dynamic reconfigurations via 
> [ConnectorContext::requestTaskReconfiguration|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
>  will fail to trigger that reconfiguration if they are not running on the 
> leader.
> In our testing environments - older versions without the linked changes pass 
> with the following configuration, and newer versions with the changes fail:
> {{ssl.keystore.location = /mnt/security/test.keystore.jks}}
> {{ssl.keystore.password = [hidden]}}
> {{ssl.keystore.type = JKS}}
> {{ssl.protocol = TLSv1.2}}
> It's important to note that the file {{/mnt/security/test.keystore.jks}} 
> isn't generated for our non-SSL tests, however these configs are still 
> included in our worker config file.
> This leads to a 500 response when hitting the create connector REST endpoint 
> with the following error:
> bq. { "error_code":500,   "message":"Failed to start RestClient:   
> /mnt/security/test.keystore.jks is not a valid keystore" }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


jolshan commented on PR #13415:
URL: https://github.com/apache/kafka/pull/13415#issuecomment-1476575571

   cherrypicked to 3.4 as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13422: MINOR: Cleanups in clients common.config

2023-03-20 Thread via GitHub


mimaison opened a new pull request, #13422:
URL: https://github.com/apache/kafka/pull/13422

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…

2023-03-20 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1476492254

   hello, maybe you are interested in this issue? @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception

2023-03-20 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14824:
---
Reviewer: Chia-Ping Tsai

> ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown 
> exception
> ---
>
> Key: KAFKA-14824
> URL: https://issues.apache.org/jira/browse/KAFKA-14824
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: hudeqi
>Priority: Blocker
>
> For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
> unknown exception and the partition fetch is suspended, the paused cleanup 
> logic of the partition needs to be canceled, otherwise it will lead to 
> serious unexpected disk usage growth.
>  
> For example, in the actual production environment (the Kafka version used is 
> 2.5.1), there is such a case: perform log dir balance on this partition 
> leader broker. After started fetching when the future log is successfully 
> created, then reset and truncate to the leader's log start offset for the 
> first time due to out of range. At the same time, because the partition 
> leader is processing the leaderAndIsrRequest, the leader epoch is updated, so 
> the ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
> 'partitionStates' of the partition are cleaned up. At the same time, the 
> logic of add ReplicaAlterLogDirsThread for the partition is executing in the 
> thread that is processing leaderAndIsrRequest. In here, the offset set by 
> InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
> performs the logic of processFetchRequest, it will throw 
> "java.lang.IllegalStateException : Offset mismatch for the future replica 
> anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, 
> log end offset = 4918576434.", leading to such a result: 
> ReplicaAlterLogDirsThread no longer fetch the partition, due to the previous 
> paused cleanup logic of the partition, the disk usage of the corresponding 
> broker increases infinitely, causing serious problems.
>  
> But I found that trunk fixed this bug in KAFKA-9087, which may cause 
> ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
> fetch. But I don't know if there will be some other unknown exceptions, and 
> at the same time, due to the current logic, it will bring the same disk 
> cleanup failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi opened a new pull request, #13421: KAFKA-14824:ReplicaAlterLogDirsThread may cause serious disk usage in…

2023-03-20 Thread via GitHub


hudeqi opened a new pull request, #13421:
URL: https://github.com/apache/kafka/pull/13421

   For ReplicaAlterLogDirsThread, if the partition is marked as failed due to 
an unknown exception and the partition fetch is suspended, the paused cleanup 
logic of the partition needs to be canceled, otherwise it will lead to serious 
unexpected disk usage growth.
   
   The detail of the issue is here: 
[jira](https://issues.apache.org/jira/browse/KAFKA-14824)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14824) ReplicaAlterLogDirsThread may cause serious disk usage in case of unknown exception

2023-03-20 Thread hudeqi (Jira)
hudeqi created KAFKA-14824:
--

 Summary: ReplicaAlterLogDirsThread may cause serious disk usage in 
case of unknown exception
 Key: KAFKA-14824
 URL: https://issues.apache.org/jira/browse/KAFKA-14824
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: hudeqi


For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an 
unknown exception and the partition fetch is suspended, the paused cleanup 
logic of the partition needs to be canceled, otherwise it will lead to serious 
unexpected disk usage growth.

 

For example, in the actual production environment (the Kafka version used is 
2.5.1), there is such a case: perform log dir balance on this partition leader 
broker. After started fetching when the future log is successfully created, 
then reset and truncate to the leader's log start offset for the first time due 
to out of range. At the same time, because the partition leader is processing 
the leaderAndIsrRequest, the leader epoch is updated, so the 
ReplicaAlterLogDirsThread appears FENCED_LEADER_EPOCH, and the 
'partitionStates' of the partition are cleaned up. At the same time, the logic 
of add ReplicaAlterLogDirsThread for the partition is executing in the thread 
that is processing leaderAndIsrRequest. In here, the offset set by 
InitialFetchState is the hw of the leader. When ReplicaAlterLogDirsThread 
performs the logic of processFetchRequest, it will throw 
"java.lang.IllegalStateException : Offset mismatch for the future replica 
anti_fraud.data_collector.anticrawler_live-54: fetched offset = 4979659327, log 
end offset = 4918576434.", leading to such a result: ReplicaAlterLogDirsThread 
no longer fetch the partition, due to the previous paused cleanup logic of the 
partition, the disk usage of the corresponding broker increases infinitely, 
causing serious problems.

 

But I found that trunk fixed this bug in KAFKA-9087, which may cause 
ReplicaAlterLogDirsThread to appear “Offset mismatch error" causing to stop 
fetch. But I don't know if there will be some other unknown exceptions, and at 
the same time, due to the current logic, it will bring the same disk cleanup 
failure problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison opened a new pull request, #13420: KAFKA-14740: Add source tag to MirrorSourceMetrics - KIP-911

2023-03-20 Thread via GitHub


mimaison opened a new pull request, #13420:
URL: https://github.com/apache/kafka/pull/13420

   New add.source.alias.to.metrics setting to add the source cluster alias to 
the MirrorSourceConnector metrics
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


C0urante commented on PR #13415:
URL: https://github.com/apache/kafka/pull/13415#issuecomment-1476461304

   Thanks Justine!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


jolshan merged PR #13415:
URL: https://github.com/apache/kafka/pull/13415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13419: KAFKA-8713: Allow using null for field in JsonConverter (KIP-581)

2023-03-20 Thread via GitHub


mimaison opened a new pull request, #13419:
URL: https://github.com/apache/kafka/pull/13419

   Add a new configuration replace.null.with.default to allow using null 
instead of the default value.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-20 Thread via GitHub


viktorsomogyi commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r114295


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException
+ *
+ * @param topicDescription
+ * @param timeoutMs
+ * @param backOffMs
+ * @param time
+ * @return the same as {@link TopicAdmin#createTopics(NewTopic...)}
+ */
+public Set createTopicsWithRetry(NewTopic topicDescription, long 
timeoutMs, long backOffMs, Time time) {

Review Comment:
   Ok, that's fair.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


C0urante commented on code in PR #13415:
URL: https://github.com/apache/kafka/pull/13415#discussion_r1142078897


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -97,7 +98,11 @@ public  HttpResponse httpRequest(String url, String 
method, HttpHeaders he
 public  HttpResponse httpRequest(String url, String method, 
HttpHeaders headers, Object requestBodyData,
   TypeReference 
responseFormat,
   SecretKey sessionKey, String 
requestSignatureAlgorithm) {
-HttpClient client = httpClient();
+// Only try to load SSL configs if we have to (see KAFKA-14816)
+SslContextFactory sslContextFactory = url.startsWith("https://";)
+? SSLUtils.createClientSideSslContextFactory(config)
+: null;
+HttpClient client = httpClient(sslContextFactory);

Review Comment:
   We wanted to be conservative about unintended changes in behavior (😞), and 
weren't certain about the thread safety of the `HttpClient` class. Discussion 
thread 
[here](https://github.com/apache/kafka/pull/12828#discussion_r1020382238).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


C0urante commented on code in PR #13415:
URL: https://github.com/apache/kafka/pull/13415#discussion_r1142078897


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -97,7 +98,11 @@ public  HttpResponse httpRequest(String url, String 
method, HttpHeaders he
 public  HttpResponse httpRequest(String url, String method, 
HttpHeaders headers, Object requestBodyData,
   TypeReference 
responseFormat,
   SecretKey sessionKey, String 
requestSignatureAlgorithm) {
-HttpClient client = httpClient();
+// Only try to load SSL configs if we have to (see KAFKA-14816)
+SslContextFactory sslContextFactory = url.startsWith("https://";)
+? SSLUtils.createClientSideSslContextFactory(config)
+: null;
+HttpClient client = httpClient(sslContextFactory);

Review Comment:
   We wanted to be conservative about unintended changes in behavior, and 
weren't certain about the thread safety of the `HttpClient` class. Discussion 
thread 
[here](https://github.com/apache/kafka/pull/12828#discussion_r1020382238).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13418: MINOR: add equals and hashcode methods to KafkaProducer and ProducerMetadata

2023-03-20 Thread via GitHub


C0urante commented on PR #13418:
URL: https://github.com/apache/kafka/pull/13418#issuecomment-1476186302

   Thanks for providing more detail. I'm still confused about this part:
   
   > The default equals method compares by address. But this is not what we 
want.
   
   Wouldn't comparison by address be exactly what you want? In what case(s) 
would you need a different kind of comparison?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming merged pull request #13403: MINOR:Fix hint about alter in TopicCommand

2023-03-20 Thread via GitHub


dengziming merged PR #13403:
URL: https://github.com/apache/kafka/pull/13403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14784) Implement connector offset reset REST API

2023-03-20 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14784:
--

Assignee: Yash Mayya

> Implement connector offset reset REST API
> -
>
> Key: KAFKA-14784
> URL: https://issues.apache.org/jira/browse/KAFKA-14784
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{DELETE /connectors/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Resettingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14785) Implement connector offset read REST API

2023-03-20 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-14785:
--

Assignee: Yash Mayya

> Implement connector offset read REST API
> 
>
> Key: KAFKA-14785
> URL: https://issues.apache.org/jira/browse/KAFKA-14785
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
>
> Implement the {{GET /connector/name/offsets}} endpoint [described in 
> KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Readingoffsets].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Schm1tz1 commented on pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables

2023-03-20 Thread via GitHub


Schm1tz1 commented on PR #12992:
URL: https://github.com/apache/kafka/pull/12992#issuecomment-1475939063

   > Can you reply to the DISCUSS/VOTE threads explaining the changes you made 
since the KIP was accepted?
   
   Sure, just sent an update to the KIP thread: 
https://lists.apache.org/thread/9dhk6fny30tgnp6z9sf2qmz4c22onmw2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14823) Clean up ConfigProvider API

2023-03-20 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14823:
--

 Summary: Clean up ConfigProvider API
 Key: KAFKA-14823
 URL: https://issues.apache.org/jira/browse/KAFKA-14823
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison


The ConfigProvider interface exposes several methods that are not used:
- ConfigData get(String path)
- default void subscribe(String path, Set keys, ConfigChangeCallback 
callback)
- default void unsubscribe(String path, Set keys, ConfigChangeCallback 
callback)
- default void unsubscribeAll()

We should either build mechanisms to support them or deprecate them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14822) Allow restricting File and Directory ConfigProviders to specific paths

2023-03-20 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14822:
--

 Summary: Allow restricting File and Directory ConfigProviders to 
specific paths
 Key: KAFKA-14822
 URL: https://issues.apache.org/jira/browse/KAFKA-14822
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison
Assignee: Mickael Maison


In sensitive environments, it would be interesting to be able to restrict the 
files that can be accessed by the built-in configuration providers.

For example:
config.providers=directory
config.providers.directory.class=org.apache.kafka.connect.configs.DirectoryConfigProvider
config.providers.directory.path=/var/run

Then if a caller tries to access another path, for example
ssl.keystore.password=${directory:/etc/passwd:keystore-password}
it would be rejected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #12992: KAFKA-14376-KIP887: Add ConfigProvider to make use of environment variables

2023-03-20 Thread via GitHub


mimaison commented on PR #12992:
URL: https://github.com/apache/kafka/pull/12992#issuecomment-1475905586

   Can you reply to the DISCUSS/VOTE threads explaining the changes you made 
since the KIP was accepted?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14821) Better handle timeouts in ListOffsets API

2023-03-20 Thread Dimitar Dimitrov (Jira)
Dimitar Dimitrov created KAFKA-14821:


 Summary: Better handle timeouts in ListOffsets API
 Key: KAFKA-14821
 URL: https://issues.apache.org/jira/browse/KAFKA-14821
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Dimitar Dimitrov
Assignee: Dimitar Dimitrov


The ListOffsets Admin API doesn't retry failed requests for partitions due to 
timeouts or due to other types of retriable exceptions. This is a step back 
compared to the Consumer offset APIs implemented in the fetcher as the latter 
can do partial retries in such cases.
 * The comparison is notable as some Kafka tools (e.g. 
{{{}kafka-get-offsets{}}}) have moved from using the Consumer offset APIs to 
using the ListOffsets Admin API.

One nice way to address that seems to be to migrate the ListOffsets API to use 
the more modern AdminApiDriver mechanism. That should automatically provide the 
capability to retry requests which responses are deemed retriable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akatona84 commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-20 Thread via GitHub


akatona84 commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1141776979


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -328,6 +335,48 @@ public Set createTopics(NewTopic... topics) {
 return createOrFindTopics(topics).createdTopics();
 }
 
+/**
+ * Implements a retry logic around creating topic(s) in case it'd fail due 
to InvalidReplicationFactorException
+ *
+ * @param topicDescription
+ * @param timeoutMs
+ * @param backOffMs
+ * @param time
+ * @return the same as {@link TopicAdmin#createTopics(NewTopic...)}
+ */
+public Set createTopicsWithRetry(NewTopic topicDescription, long 
timeoutMs, long backOffMs, Time time) {

Review Comment:
   That retrier is constructed for actually retriable exception and in our case 
the exception is wrapped in another exception and don't really want to 
touch/refactor/hack this utility method to handle these kind of exceptions too 
and adding more not-necessarily retryable exceptions to check in the condition 
where it retries.
   This refactor would change the semantics of the retries where it's being 
used currently.
   
   long story short, I feel this topic creation retry is a bit special and 
wouldn't been used this way in many places later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akatona84 commented on a diff in pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2023-03-20 Thread via GitHub


akatona84 commented on code in PR #11565:
URL: https://github.com/apache/kafka/pull/11565#discussion_r1141771004


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaTopicBasedBackingStore.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public abstract class KafkaTopicBasedBackingStore {

Review Comment:
   I tried it.
   in one hand:
   It'd require the composed field to be static, because of static methods 
would use the topicInitializer method:
   
https://github.com/apache/kafka/blob/15dfe065fb0ef34145ec204123c592e468842de8/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L129
   
   in the other hand (which relates to the first one a bit): I'm not sure if I 
could inject this composition nicely, so that I can test it (or mock part of 
it) easily.
   Currently the tests are prepared in a way that the `createKafkaBasedLog` is 
the method of the actual backingstore class, it would need a quite bit of a 
refactor on the tests too if I want to avoid this inheritance :)
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-03-20 Thread via GitHub


dajac commented on code in PR #13350:
URL: https://github.com/apache/kafka/pull/13350#discussion_r1141731445


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -495,8 +615,8 @@ private class ConstrainedAssignmentBuilder extends 
AbstractAssignmentBuilder {
 @Override
 Map> build() {
 if (log.isDebugEnabled()) {
-log.debug("Performing constrained assign with 
partitionsPerTopic: {}, currentAssignment: {}.",
-partitionsPerTopic, currentAssignment);
+log.debug("Performing constrained assign with 
partitionsPerTopic: {}, currentAssignment: {} rackInfo {}.",

Review Comment:
   nit: missing `,` before `rackInfo`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -519,13 +641,14 @@ Map> build() {
 return assignment;
 }
 
-
 // Reassign previously owned partitions, up to the expected number of 
partitions per consumer
 private void assignOwnedPartitions() {
 
 for (Map.Entry> consumerEntry : 
currentAssignment.entrySet()) {
 String consumer = consumerEntry.getKey();
-List ownedPartitions = 
consumerEntry.getValue();
+List ownedPartitions = 
consumerEntry.getValue().stream()
+.filter(tp -> !rackInfo.racksMismatch(consumer, tp))

Review Comment:
   For my understanding, if rack awareness is disabled, this is a no-op, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -61,35 +72,51 @@ static final class ConsumerGenerationPair {
 public static final class MemberData {
 public final List partitions;
 public final Optional generation;
-public MemberData(List partitions, Optional 
generation) {
+public final Optional rackId;
+public MemberData(List partitions, Optional 
generation, Optional rackId) {
 this.partitions = partitions;
 this.generation = generation;
+this.rackId = rackId;
+}
+
+public MemberData(List partitions, Optional 
generation) {
+this(partitions, generation, Optional.empty());
 }
 }
 
 abstract protected MemberData memberData(Subscription subscription);
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumerToOwnedPartitions = new 
HashMap<>();
 Set partitionsWithMultiplePreviousOwners = new 
HashSet<>();
+
+List allPartitions = new ArrayList<>();
+partitionsPerTopic.values().forEach(allPartitions::addAll);
+RackInfo rackInfo =  new RackInfo(allPartitions, subscriptions);

Review Comment:
   nit: There is an extra space.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -574,6 +697,42 @@ private void assignOwnedPartitions() {
 }
 }
 
+// Round-Robin filling within racks for remaining members up to the 
expected numbers of maxQuota,
+// otherwise, to minQuota
+private void assignRackAwareRoundRobin(List 
unassignedPartitions) {
+int nextUnfilledConsumerIndex = 0;
+Iterator unassignedIter = 
unassignedPartitions.iterator();
+while (!rackInfo.consumerRacks.isEmpty() && 
unassignedIter.hasNext()) {

Review Comment:
   nit: Do we ever mutate `ackInfo.consumerRacks`? If not, would it make sense 
to put this check as first statement in the method and return if is it empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13415: KAFKA-14816: Only load SSL properties when issuing cross-worker requests to HTTPS URLs

2023-03-20 Thread via GitHub


yashmayya commented on code in PR #13415:
URL: https://github.com/apache/kafka/pull/13415#discussion_r1141704800


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -97,7 +98,11 @@ public  HttpResponse httpRequest(String url, String 
method, HttpHeaders he
 public  HttpResponse httpRequest(String url, String method, 
HttpHeaders headers, Object requestBodyData,
   TypeReference 
responseFormat,
   SecretKey sessionKey, String 
requestSignatureAlgorithm) {
-HttpClient client = httpClient();
+// Only try to load SSL configs if we have to (see KAFKA-14816)
+SslContextFactory sslContextFactory = url.startsWith("https://";)
+? SSLUtils.createClientSideSslContextFactory(config)
+: null;
+HttpClient client = httpClient(sslContextFactory);

Review Comment:
   Not related to this change, but any idea why we're creating and starting an 
HTTP client for every single request here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org