[GitHub] [kafka] aindriu-aiven commented on pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-09 Thread via GitHub


aindriu-aiven commented on PR #14159:
URL: https://github.com/apache/kafka/pull/14159#issuecomment-1670825228

   Thanks @gharris1727 I think I covered all the points you mentioned there any 
issues at all though please let me know!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #14171: Kafka Streams Threading: Timeout behavior

2023-08-09 Thread via GitHub


lucasbru opened a new pull request, #14171:
URL: https://github.com/apache/kafka/pull/14171

   Implement setting and clearing task timeouts, as well as changing the output 
on exceptions to make 
   it similar to the existing code path. This is a stacked PR, only the last 
commit needs to be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #14096: KAFKA-14595 AdminUtils rewritten in java

2023-08-09 Thread via GitHub


mimaison merged PR #14096:
URL: https://github.com/apache/kafka/pull/14096


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-08-09 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reopened KAFKA-14595:


> Move ReassignPartitionsCommand to tools
> ---
>
> Key: KAFKA-14595
> URL: https://issues.apache.org/jira/browse/KAFKA-14595
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1288145597


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -343,21 +345,78 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 /**
  * Deletes the internal topic partition info if delete flag is set as true.
  *
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
  * @param delete flag to indicate whether the given topic 
partitions to be deleted or not.
  */
-public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+public void stopPartitions(Set topicPartitions,
+   boolean delete,
+   BiConsumer 
errorHandler) {
+LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+Set topicIdPartitions = topicPartitions.stream()
+.filter(topicIdByPartitionMap::containsKey)
+.map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+.collect(Collectors.toSet());
+
+topicIdPartitions.forEach(tpId -> {
+try {
+RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+if (task != null) {
+LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+task.cancel();
+}
+if (delete) {
+LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+deleteRemoteLogPartition(tpId);

Review Comment:
   sure, we can do this build incrementally on this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14151: KAFKA-15083: add config with "remote.log.metadata" prefix

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14151:
URL: https://github.com/apache/kafka/pull/14151#discussion_r1288158470


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -913,6 +969,12 @@ private RemoteLogManagerConfig createRLMConfig(Properties 
props) {
 
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
 
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteStorageManager.class.getName());
 
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteLogMetadataManager.class.getName());
+props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 
remoteLogMetadataTopicPartitionsNum);
+props.put(remoteLogMetadataTestProp, remoteLogMetadataTestVal);
+props.put(remoteLogMetadataCommonClientTestProp, 
remoteLogMetadataCommonClientTestVal);
+props.put(remoteLogMetadataConsumerTestProp, 
remoteLogMetadataConsumerTestVal);
+props.put(remoteLogMetadataProducerTestProp, 
remoteLogMetadataProducerTestVal);

Review Comment:
   Perhaps, create a `TestUtils.remoteLogTopicBasedRlmmConfig()` similar to 
`TestUtils.producerConfig` or `TestUtils.createBrokerConfigs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-09 Thread via GitHub


lucasbru commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1288175911


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   I checked this out and we need to be a bit careful here to not have the task 
executors to sleep when work is available. We had several bugs like this in 
state updater. We have to make sure to signal on the condtion variable any time 
a task becomes "processable". Here are some situations where a task becomes 
processable:
   
- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked 
inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.
   
   So in summary, we need to awake the task executors after every poll phase, 
inside `StreamThread.signalResume`, `DefaultTaskManager.unassignTask`, 
`DefaultTaskManager.unlockTasks`, `DefaultTaskManager.addTask`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-09 Thread via GitHub


lucasbru commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1288175911


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   I checked this out and we need to be a bit careful here to not have the task 
executors to sleep when work is available. We had several bugs like this in 
state updater. We have to make sure to signal on the condtion variable any time 
a task becomes "processable". Here are some situations where a task becomes 
processable:
   
- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked 
inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.
   
   So in summary, we 
   
- we should probably lock tasks when they are paused and unlock them when 
they are resumed. This belongs to the StreamThread integration work.
- need to awake the task executors `DefaultTaskManager.unassignTask`, 
`DefaultTaskManager.unlockTasks`, `DefaultTaskManager.addTask`. 
- awake the task executors after every polling phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #14166: MINOR: Use Mockito's strict stubs functionality for Connect tests and cleanup unused stubs

2023-08-09 Thread via GitHub


mimaison merged PR #14166:
URL: https://github.com/apache/kafka/pull/14166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14166: MINOR: Use Mockito's strict stubs functionality for Connect tests and cleanup unused stubs

2023-08-09 Thread via GitHub


yashmayya commented on PR #14166:
URL: https://github.com/apache/kafka/pull/14166#issuecomment-1670963634

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #14172: KAFKA-14595 Java version of ReassignPartitionsCommand POJOs

2023-08-09 Thread via GitHub


nizhikov opened a new pull request, #14172:
URL: https://github.com/apache/kafka/pull/14172

   This PR part of #13247 and introduces Java version of 
ReassignPartitionsCommand POJOs 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move ReassignPartitionsCommand to java

2023-08-09 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1670967558

   @mimaison Thanks for review and merge of #14096. 
   
   I create PR that contains java version of ReassignPartitionsCommand POJOs.
   Please, take a look - #14172
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-09 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15309:
---
Attachment: app.log
KafkaProducerReproducer.java

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] msn-tldr commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


msn-tldr commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288249340


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   @AndrewJSchofield & @junrao is my understanding correct?
   
   > For example, if a produce request fails, we request a metadata update and 
set backoffUpdateRequests to 0
   
   My understanding of the PR is this, metadata request won't backoff, but 
produce request would backoff. So *likely* metadata is going to be updated next 
time around produce request is retried(post backoff).
   
   > Otherwise, we likely will still have the stale metadata when retrying the 
produce request.
   
   So this shouldn't be a problem, correct?
   
   



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -119,20 +130,24 @@ public synchronized Cluster fetch() {
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+return Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
 }
 
 /**
  * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
- * is now
+ * current info can be updated (i.e. backoff time has elapsed). There are 
two calculations for backing off based on
+ * how many requests with backing off have been issued, and how many 
attempts to retrieve metadata have been made
+ * since the last successful response. The first of these allows backing 
off when there are errors to do with
+ * stale metadata, even though the metadata responses are clean.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till updating the cluster info
  */
 public synchronized long timeToNextUpdate(long nowMs) {
-long timeToExpire = updateRequested() ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
+long timeToUpdateWithBackoff = Math.max(this.lastRefreshMs +
+(this.backoffUpdateRequests > 0 ? 
this.refreshBackoff.backoff(this.backoffUpdateRequests - 1) : 0) - nowMs, 0);
+long timeToExpire = Math.max(this.lastSuccessfulRefreshMs + 
this.metadataExpireMs - nowMs, 0);
+return Math.max(updateRequested() ? timeToUpdateWithBackoff : 
timeToExpire, timeToAllowUpdate(nowMs));

Review Comment:
   @AndrewJSchofield i think this method could use inline comments to make it 
easier to understand. Take the below with a pinch of salt, since based on my 
high level understanding of this PR :)
   
   ```
   long timeToUpdateWithBackoff = Math.max(this.lastRefreshMs +
   (this.backoffUpdateRequests > 0 ? 
this.refreshBackoff.backoff(this.backoffUpdateRequests - 1) : 0) - nowMs, 0);
   // Metadata can become stale due to
   // 1. explicitly signalled through requestUpdate*() et al. Time should take 
into account if update is requested with backoff.
   // 2. time based expiry.
   long timeToUpdateWithStaleMetadata = updateRequested() ? 
timeToUpdateWithBackoff : timeToExpire
   
   // This happens if previous attempt to refresh metadata to a broker, failed 
with RPC error etc.
   long timeToUpdateWithPreviousRefreshFailed = timeToAllowUpdate(nowMs);
   
   return Math.max(timeToUpdateWithStaleMetadata, 
timeToUpdateWithPreviousRefreshFailed)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


clolov commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288265653


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   Sorry, can you elaborate because I am not certain I understand?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14135: KAFKA-14991: Implementation of KIP-937 which improves message timesta…

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14135:
URL: https://github.com/apache/kafka/pull/14135#discussion_r1288249161


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1862,7 +1884,35 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 else MetadataVersion.fromVersionString(logMessageFormatVersionString)
 
   def logMessageTimestampType = 
TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
+
+  /* See `TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG` for details 
*/
+  @deprecated("3.6")
   def logMessageTimestampDifferenceMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+
+  // In the transition period before logMessageTimestampDifferenceMaxMs is 
removed, to maintain backward compatibility,
+  // we are using its value if logMessageTimestampBeforeMaxMs default value 
hasn't changed.
+  @nowarn("cat=deprecation")
+  def logMessageTimestampBeforeMaxMs: Long = {
+val messageTimestampBeforeMaxMs: Long = 
getLong(KafkaConfig.LogMessageTimestampBeforeMaxMsProp)
+if (messageTimestampBeforeMaxMs != Long.MaxValue) {

Review Comment:
   Instead of `Long.MaxValue` could we use 
`LogConfig.DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS`? I know that the value 
is same but the latter is semantically correct since we want to check for 
default value.



##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
 }
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testSendWithInvalidCreateTime(quorum: String): Unit = {
+  @ParameterizedTest
+  @MethodSource(Array("timestampConfigProvider"))
+  def testSendWithInvalidBeforeAndAfterTimestamp(messageTimeStampConfig: 
String, recordTimestamp: Long): Unit = {

Review Comment:
   please modify the "valid" test as well to add the usage of this new 
configuration which should test for parameters where value is equal to 
timestamp etc.



##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -1398,6 +1437,73 @@ class LogValidatorTest {
 assertEquals(6, e.recordErrors.size)
   }
 
+  @Test
+  def testRecordWithPastTimestampIsRejected(): Unit = {
+val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
+val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
+val now = System.currentTimeMillis()
+val fiveMinutesBeforeThreshold = now - timestampBeforeMaxConfig - (5 * 60 
* 1000L)
+val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V2, 
timestamp = fiveMinutesBeforeThreshold,
+  codec = CompressionType.GZIP)
+val e = assertThrows(classOf[RecordValidationException],
+  () => new LogValidator(
+records,
+topicPartition,
+time,
+CompressionType.GZIP,
+CompressionType.GZIP,
+false,
+RecordBatch.MAGIC_VALUE_V1,
+TimestampType.CREATE_TIME,
+timestampBeforeMaxConfig,
+timestampAfterMaxConfig,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+AppendOrigin.CLIENT,
+MetadataVersion.latest
+  ).validateMessagesAndAssignOffsets(
+PrimitiveRef.ofLong(0L), metricsRecorder, 
RequestLocal.withThreadConfinedCaching.bufferSupplier
+  )
+)
+
+assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
+assertFalse(e.recordErrors.isEmpty)
+assertEquals(e.recordErrors.size, 3)
+  }
+
+
+  @Test
+  def testRecordWithFutureTimestampIsRejected(): Unit = {
+val timestampBeforeMaxConfig = 24 * 60 * 60 * 1000L //24 hrs
+val timestampAfterMaxConfig = 1 * 60 * 60 * 1000L //1 hr
+val now = System.currentTimeMillis()

Review Comment:
   In the tests we usually use MockTime. Is it possible to use that here?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -186,6 +191,12 @@ public Optional serverConfigName(String 
configName) {
 @SuppressWarnings("deprecation")
 private static final String MESSAGE_FORMAT_VERSION_CONFIG = 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
 
+@SuppressWarnings("deprecation")

Review Comment:
   what "deprecation" warning are we suppressing here? Wouldn't these 
configuration continue to exist in future?



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -761,6 +761,7 @@ class KafkaConfigTest {
   }
 
   @Test
+  @nowarn("cat=deprecation")

Review Comment:
   please add a comment where ever we are adding this on which particular part 
of code is deprecated. It helps to ensure that this annotation is deleted when 
we remove the deprecated code.



##
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala:
##
@@ -120,17 +121,17 @@ class PlaintextProducerSendTest extends 
BaseProducerSe

[GitHub] [kafka] yashmayya commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-09 Thread via GitHub


yashmayya commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1288249581


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3872,7 +2952,7 @@ public void testVerifyTaskGeneration() {
 assertThrows(ConnectException.class, () -> 
herder.verifyTaskGenerationAndOwnership(unassignedTask, 1, verifyCallback));
 assertThrows(ConnectException.class, () -> 
herder.verifyTaskGenerationAndOwnership(unassignedTask, 2, verifyCallback));
 
-PowerMock.verifyAll();
+verify(verifyCallback, times(3)).onCompletion(isNull(), isNull());

Review Comment:
   > We expect this to be invoked five times in the original test
   
   There are only 3 calls to `herder::verifyTaskGenerationAndOwnership` in this 
test that aren't expected to throw an exception. 
   
   > Were we never properly verifying the number of invocations in the existing 
test?
   
   Yep, the `verifyCallback` mock isn't passed into the `PowerMock::replayAll` 
call  which means that it isn't "known" to `PowerMock` and the 
`PowerMock::verifyAll` call will not make any verifications on it. If this line 
in the original test - 
   
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java#L3838
   
   is changed to `PowerMock.replayAll(verifyCallback);` then the original test 
fails and only passes when we change this loop from 5 iterations to 3 -
   
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java#L3833-L3836



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288285456


##
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala:
##
@@ -154,6 +155,92 @@ class KafkaServerTest extends QuorumTestHarness {
 server.shutdown()
   }
 
+  @Test
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit 
= {
+val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 true.toString)
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+  
"org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+
tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+  "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps))
+server.remoteLogManagerOpt match {
+  case Some(_) =>
+  case None => fail("RemoteLogManager should be initialized")
+}
+
+val topicProps = new Properties()
+topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
true.toString)

Review Comment:
   That will indeed be useful. But perhaps is a separate PR so that we can use 
that method to replace config creation in other places as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] prestona opened a new pull request, #14173: KAFKA-15042: Improve TAGGED_FIELDS protocol documentation

2023-08-09 Thread via GitHub


prestona opened a new pull request, #14173:
URL: https://github.com/apache/kafka/pull/14173

   - Rename TAG_BUFFER -> TAGGED_FIELDS (for better consistency with KIP-482 
and the implementation)
   - Add TAGGED_FIELDS into "protocol primitive types" section, and improve 
description
   - Exclude "_tagged_fields" from individual API message field descriptions
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15309) Add custom error handler to Producer

2023-08-09 Thread Tomonari Yamashita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752369#comment-17752369
 ] 

Tomonari Yamashita commented on KAFKA-15309:


FYI, I've created a simple reproducer. The Kafka Producer behavior has changed 
since Kafka 3.2.0 implemented KAFKA-9279, which forces the transaction to abort 
if any of the individual producer.send()'s fail, and this change is expected 
behavior. 

This reproducer is able to confirm that the Kafka Producer behavior has 
changed. In addition, maybe, by integrating the extension of the error handler 
provided in this ticket, it could be used as the basis for a reference 
implementation that skips wrong messages.

[Reproduction procedure]
 # Create "input-topic" topic and "output-topic"
 # Put several messages on "input-topic"
 # Execute a simple Kafka Producer program that transfers too large messages 
from "input-topic" to "output-topic" with transaction and ignores 
RecordTooLargeException of producer.send(). Please refer to the reproducer 
program (attached file: KafkaProducerReproducer.java).
 # ==> Changing the version of Kafka Producer will lead to a different behavior 
whether commit is available or not (i.e., the transaction aborts), as follows:
 -- Kafka 2.8.2 : It is possible to commit ignoring RecordTooLargeException
 -- Kafka 3.0.0 : It is possible to commit ignoring RecordTooLargeException
 -- Kafka 3.1.2 : It is possible to commit ignoring RecordTooLargeException
 -- Kafka 3.2.0 : Cannot skip messages by ignoring RecordTooLargeException 
because the transaction was rollbacked/aborted.
 -- Kafka 3.2.3 : Cannot skip messages by ignoring RecordTooLargeException 
because the transaction was rollbacked/aborted.
 -- Kafka 3.5.1 : Cannot skip messages by ignoring RecordTooLargeException 
because the transaction was rollbacked/aborted.

> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
> Attachments: KafkaProducerReproducer.java, app.log
>
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288298071


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   We are ensuring that the system level remote storage cannot be disabled when 
the topics are enabled with remote storage. If the user downgrades the build, 
then this validation won't be applicable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-09 Thread via GitHub


yashmayya commented on code in PR #14153:
URL: https://github.com/apache/kafka/pull/14153#discussion_r1288307819


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java:
##
@@ -114,29 +113,49 @@ public class KafkaBasedLogTest {
 private static final String TP0_VALUE_NEW = "VAL0_NEW";
 private static final String TP1_VALUE_NEW = "VAL1_NEW";
 
-private Time time = new MockTime();
-private KafkaBasedLog store;
+private final Time time = new MockTime();
+private MockedKafkaBasedLog store;
 
 @Mock
-private Runnable initializer;
+private Consumer initializer;
 @Mock
 private KafkaProducer producer;
-private MockConsumer consumer;
-@Mock

Review Comment:
   Ah yeah, that makes sense, the other methods are expecting the consumer to 
be used to retrieve the end offsets instead of the admin. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288309828


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   > For my understanding, will this approach work if the user wants to 
downgrade from 3.6 to 2.8?
   
   Correct me if I am wrong here @kamalcph but downgrade is not supported in 
Kafka as per "Once the brokers begin using the latest protocol version, it will 
no longer be possible to downgrade the cluster to an older version." from 
https://kafka.apache.org/documentation.html#upgrade_350_zk This is true 
irrespective of Tiered Storage or not.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-09 Thread via GitHub


divijvaidya commented on PR #14153:
URL: https://github.com/apache/kafka/pull/14153#issuecomment-1671124312

   Thank you for reviewing @yashmayya. The CI is still flaky with large number 
of failures and I am restarting it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-09 Thread Tomonari Yamashita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752396#comment-17752396
 ] 

Tomonari Yamashita commented on KAFKA-15259:


My opinion is neutral on whether it should be solved right away in Kafka 
Streams. I would like to hear from others if they have an opinion. (But I think 
there is no problem to solve it with the Kafka Producer first.

If I could write opinions on both sides:
 - [Kafka Producer First] Similar issues occur not only with Kafka Streams but 
also with Kafka Producer. Skipping messages is difficult even for the Kafka 
Procuder. it needs to be resolved first in the Kafka Producer ticket.
 - [Kafka Streams First] The behavior with Kafka Streams is a bug, but the 
behavior with Kafka Producer is expected behavior, and resolving Kafka Streams 
should take priority over Kafka Producer.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using exactly_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with exactly_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of exactly_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is 

[GitHub] [kafka] riedelmax commented on pull request #14124: Kafka-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes.

2023-08-09 Thread via GitHub


riedelmax commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1671187508

   I will find time in the next days to fix those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15172) Allow exact mirroring of ACLs between clusters

2023-08-09 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752398#comment-17752398
 ] 

hudeqi commented on KAFKA-15172:


Hi, I have initiated a kip discussion: 
[https://lists.apache.org/thread/j17cdrpqxpn924cztdgxxm86qhf89sop] [~mimaison] 
[~ChrisEgerton] 

> Allow exact mirroring of ACLs between clusters
> --
>
> Key: KAFKA-15172
> URL: https://issues.apache.org/jira/browse/KAFKA-15172
> Project: Kafka
>  Issue Type: Task
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: hudeqi
>Priority: Major
>  Labels: kip-965
>
> When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The 
> rationale to is prevent other clients to produce to remote topics. 
> However in disaster recovery scenarios, where the target cluster is not used 
> and just a "hot standby", it would be preferable to have exactly the same 
> ACLs on both clusters to speed up failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #14172: KAFKA-14595 Java version of ReassignPartitionsCommand POJOs

2023-08-09 Thread via GitHub


mimaison commented on code in PR #14172:
URL: https://github.com/apache/kafka/pull/14172#discussion_r1288390063


##
tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the move is in progress.
+ */
+final class ActiveMoveState implements LogDirMoveState {
+public final String currentLogDir;
+
+public final String targetLogDir;
+
+public final String futureLogDir;
+
+/**
+ * @param currentLogDir   The current log directory.
+ * @param futureLogDirThe log directory that the replica is moving 
to.
+ * @param targetLogDirThe log directory that we wanted the replica 
to move to.
+ */
+public ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) {
+this.currentLogDir = currentLogDir;
+this.targetLogDir = targetLogDir;
+this.futureLogDir = futureLogDir;
+}
+
+@Override
+public boolean done() {
+return false;
+}
+
+@Override
+public boolean equals(Object o) {

Review Comment:
   Do we need `equals()` and `hashcode()` on all these objects?



##
tools/src/main/java/org/apache/kafka/tools/reassign/Tuple.java:
##
@@ -0,0 +1,48 @@
+/*

Review Comment:
   Let's not include this class in this PR. At this stage it's unclear if it 
will be needed. If we really need it let's put it in the PR moving the tool.



##
tools/src/main/java/org/apache/kafka/tools/reassign/TerseReassignmentFailureException.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * An exception thrown to indicate that the command has failed, but we don't 
want to
+ * print a stack trace.
+ */
+public class TerseReassignmentFailureException extends KafkaException {

Review Comment:
   Do we need this exception? I'm assuming we should be able to reuse the 
existing `TerseException` since only the message is supposed to be returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] florin-akermann opened a new pull request, #14174: Kafka 12317: Relax non-null key requirement in Kafka Streams

2023-08-09 Thread via GitHub


florin-akermann opened a new pull request, #14174:
URL: https://github.com/apache/kafka/pull/14174

   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams
   
   Test Strategy:
   
   Basic tests on minimal topologies with call to '.repartitioned()' prepended.
   TODO: expand test cases - assert correct behavior for self joins, ...
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14102: KAFKA-13187: Replace EasyMock / PowerMock with Mockito in DistributedHerderTest

2023-08-09 Thread via GitHub


yashmayya commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1288239604


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3724,62 +2818,54 @@ public void 
testExternalZombieFencingRequestDelayedCompletion() throws Exception
 pendingFencing,
 tasksPerConnector
 );
-tasksPerConnector.keySet().forEach(c -> 
expectConfigRefreshAndSnapshot(configState));
+expectConfigRefreshAndSnapshot(configState);
+
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+doNothing().when(member).poll(anyLong());
 
 // The callbacks that the herder has accrued for outstanding fencing 
futures, which will be completed after
 // a successful round of fencing and a task record write to the config 
topic
-Map>> 
herderFencingCallbacks = new HashMap<>();
+Map>> 
herderFencingCallbacks = new HashMap<>();
 // The callbacks that the herder has installed for after a successful 
round of zombie fencing, but before writing
 // a task record to the config topic
-Map>> 
workerFencingFollowups = new HashMap<>();
+Map>> 
workerFencingFollowups = new HashMap<>();
 
 Map callbacksInstalled = new HashMap<>();
 tasksPerConnector.forEach((connector, numStackedRequests) -> {
 // The future returned by Worker::fenceZombies
-KafkaFuture workerFencingFuture = 
EasyMock.mock(KafkaFuture.class);
-// The future tracked by the herder (which tracks the fencing 
performed by the worker and the possible followup write to the config topic) 
-KafkaFuture herderFencingFuture = 
EasyMock.mock(KafkaFuture.class);
+KafkaFuture workerFencingFuture = mock(KafkaFuture.class);
+// The future tracked by the herder (which tracks the fencing 
performed by the worker and the possible followup write to the config topic)
+KafkaFuture herderFencingFuture = mock(KafkaFuture.class);
 
-Capture> 
herderFencingCallback = EasyMock.newCapture(CaptureType.ALL);
+ArgumentCaptor> 
herderFencingCallback = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
 herderFencingCallbacks.put(connector, herderFencingCallback);
 
 // Don't immediately invoke callbacks that the herder sets up for 
when the worker fencing and writes to the config topic have completed
 // Instead, wait for them to be installed, then invoke them 
explicitly after the fact on a thread separate from the herder's tick thread
-
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback)))
-.andReturn(null)
-.times(numStackedRequests + 1);
+
when(herderFencingFuture.whenComplete(herderFencingCallback.capture())).thenReturn(null);
 
-Capture> fencingFollowup = 
EasyMock.newCapture();
+ArgumentCaptor> 
fencingFollowup = ArgumentCaptor.forClass(KafkaFuture.BaseFunction.class);
 CountDownLatch callbackInstalled = new CountDownLatch(1);
 workerFencingFollowups.put(connector, fencingFollowup);
 callbacksInstalled.put(connector, callbackInstalled);
-
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.capture(fencingFollowup))).andAnswer(()
 -> {
+
when(workerFencingFuture.thenApply(fencingFollowup.capture())).thenAnswer(invocation
 -> {
 callbackInstalled.countDown();
 return herderFencingFuture;
 });
 
 // We should only perform a single physical zombie fencing; all 
the subsequent requests should be stacked onto the first one
-EasyMock.expect(worker.fenceZombies(
-EasyMock.eq(connector), 
EasyMock.eq(taskCountRecords.get(connector)), EasyMock.anyObject())
-).andReturn(workerFencingFuture);
-
-for (int i = 0; i < numStackedRequests; i++) {
-expectConfigRefreshAndSnapshot(configState);
-}
-
-PowerMock.replay(workerFencingFuture, herderFencingFuture);
+when(worker.fenceZombies(eq(connector), 
eq(taskCountRecords.get(connector)), any()))
+.thenReturn(workerFencingFuture)
+.thenAnswer(invocation -> {
+fail("Expected only a single zombie fencing per 
connector");

Review Comment:
   Oh, that's a good point, it would not be bubbled up correctly in fact.
   
   Edit: I guess it'd actually depend on where the extra calls are being made 
from but either way the explicit verification is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr

[GitHub] [kafka] vveicc opened a new pull request, #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


vveicc opened a new pull request, #14175:
URL: https://github.com/apache/kafka/pull/14175

   Change BrokerApiVersionsCommandTest to support kraft mode.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Florin Akermann (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752407#comment-17752407
 ] 

Florin Akermann commented on KAFKA-12317:
-

Drafted a PR: [https://github.com/apache/kafka/pull/14174]

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on a diff in pull request #14172: KAFKA-14595 Java version of ReassignPartitionsCommand POJOs

2023-08-09 Thread via GitHub


nizhikov commented on code in PR #14172:
URL: https://github.com/apache/kafka/pull/14172#discussion_r1288443925


##
tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the move is in progress.
+ */
+final class ActiveMoveState implements LogDirMoveState {
+public final String currentLogDir;
+
+public final String targetLogDir;
+
+public final String futureLogDir;
+
+/**
+ * @param currentLogDir   The current log directory.
+ * @param futureLogDirThe log directory that the replica is moving 
to.
+ * @param targetLogDirThe log directory that we wanted the replica 
to move to.
+ */
+public ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) {
+this.currentLogDir = currentLogDir;
+this.targetLogDir = targetLogDir;
+this.futureLogDir = futureLogDir;
+}
+
+@Override
+public boolean done() {
+return false;
+}
+
+@Override
+public boolean equals(Object o) {

Review Comment:
   Yes. It used in tests during comparsion when POJOs are keys or values of 
collection returned by tested code.
   
   Examples:
   ```
   class ReassignPartitionsUnitTest {
   ...
 @Test
 def testFindPartitionReassignmentStates(): Unit = {
   ...
 assertEquals((Map(
 new TopicPartition("foo", 0) -> 
PartitionReassignmentState(Seq(0,1,2), Seq(0,1,3), false),
 new TopicPartition("foo", 1) -> 
PartitionReassignmentState(Seq(1,2,3), Seq(1,2,3), true)
   ), true),
   findPartitionReassignmentStates(adminClient, Seq(
 (new TopicPartition("foo", 0), Seq(0,1,3)),
 (new TopicPartition("foo", 1), Seq(1,2,3))
   )))
   ...
 @Test
 def testFindLogDirMoveStates(): Unit = {
   ...
 assertEquals(Map(
   new TopicPartitionReplica("bar", 0, 0) -> new 
CompletedMoveState("/tmp/kafka-logs0"),
   new TopicPartitionReplica("foo", 0, 0) -> new 
ActiveMoveState("/tmp/kafka-logs0",
   "/tmp/kafka-logs1", "/tmp/kafka-logs1"),
   new TopicPartitionReplica("foo", 1, 0) -> new 
CancelledMoveState("/tmp/kafka-logs0",
 "/tmp/kafka-logs1"),
   new TopicPartitionReplica("quux", 1, 0) -> new 
MissingLogDirMoveState("/tmp/kafka-logs1"),
   new TopicPartitionReplica("quuz", 0, 0) -> new 
MissingReplicaMoveState("/tmp/kafka-logs0")
 ), findLogDirMoveStates(adminClient, Map(
   new TopicPartitionReplica("bar", 0, 0) -> "/tmp/kafka-logs0",
   new TopicPartitionReplica("foo", 0, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("foo", 1, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("quux", 1, 0) -> "/tmp/kafka-logs1",
   new TopicPartitionReplica("quuz", 0, 0) -> "/tmp/kafka-logs0"
 )))
   ...
   
   ```



##
tools/src/main/java/org/apache/kafka/tools/reassign/Tuple.java:
##
@@ -0,0 +1,48 @@
+/*

Review Comment:
   Removed



##
tools/src/main/java/org/apache/kafka/tools/reassign/TerseReassignmentFailureException.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import org

[jira] [Assigned] (KAFKA-15295) Add config validation when remote storage is enabled on a topic

2023-08-09 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15295:


Assignee: Kamal Chandraprakash  (was: Luke Chen)

> Add config validation when remote storage is enabled on a topic
> ---
>
> Key: KAFKA-15295
> URL: https://issues.apache.org/jira/browse/KAFKA-15295
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> If system level remote storage is not enabled, then enabling remote storage 
> on a topic should throw exception while validating the configs. 
> See https://github.com/apache/kafka/pull/14114#discussion_r1280372441 for 
> more details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288449342


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   lgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls

2023-08-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15325:
--

 Summary: Integrate topicId in OffsetFetch and OffsetCommit async 
consumer calls
 Key: KAFKA-15325
 URL: https://issues.apache.org/jira/browse/KAFKA-15325
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit 
APIs. The consumer calls to those APIs should be updated to include topicIds 
when available.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


dengziming commented on code in PR #14175:
URL: https://github.com/apache/kafka/pull/14175#discussion_r1288503732


##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
 assertTrue(lineIter.hasNext)
 assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
 val nodeApiVersions = NodeApiVersions.create

Review Comment:
   We should change how we construct `NodeApiVersions` here,  such as `val 
nodeApiVersions = new 
NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava, 
Collections.emptyList(), false)`



##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -17,37 +17,47 @@
 
 package kafka.admin
 
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-import scala.collection.Seq
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.message.ApiMessageType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
 
   def generateConfigs: Seq[KafkaConfig] =
-TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
-  // Configure control plane listener to make sure we have separate 
listeners from client,
-  // in order to avoid returning Envelope API version.
-  props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
-  props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  props.setProperty("listeners", 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-  props.setProperty(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-  props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
-  props
-}).map(KafkaConfig.fromProps)
+if (isKRaftTest()) {
+  TestUtils.createBrokerConfigs(1, null).map(props => {
+props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")

Review Comment:
   Please add a comment about this change, I guess this is related to KIP-848, 
and we can remove this after KIP-848 is all set.



##
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
 assertTrue(lineIter.hasNext)
 assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
 val nodeApiVersions = NodeApiVersions.create
-val enabledApis = ApiKeys.zkBrokerApis.asScala
-for (apiKey <- enabledApis) {
-  val apiVersion = nodeApiVersions.apiVersion(apiKey)
-  assertNotNull(apiVersion)
+val listenerType = if (isKRaftTest()) {
+  ApiMessageType.ListenerType.BROKER
+} else {
+  ApiMessageType.ListenerType.ZK_BROKER
+}
+val clientApis = ApiKeys.clientApis().asScala
+for (apiKey <- clientApis) {
+  assertTrue(lineIter.hasNext)
+  val actual = lineIter.next()
+  if (apiKey.inScope(listenerType)) {
+val apiVersion = nodeApiVersions.apiVersion(apiKey)
+assertNotNull(apiVersion)
 
-  val versionRangeStr =
-if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
-else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
-  val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
+val versionRangeStr =
+  if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
+  else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
 
-  val terminator = if (apiKey == enabledApis.last) "" else ","
+val terminator = if (apiKey == clientApis.last) "" else ","
 
-  val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
-  assertTrue(lineIter.hasNext)
-  assertEquals(line, lineIter.next())
+val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
+assertTrue(lineIter.hasNext)
+assertEquals(line, actual)
+  }

Review Comment:
   We should add a else branch 

[GitHub] [kafka] mumrah commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


mumrah commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288505525


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -256,11 +224,6 @@ public void replay(
 throw new RuntimeException("Unable to replay " + record + " for " 
+ acl +
 ": acl not found " + "in existingAcls.");
 }
-if (!snapshotId.isPresent()) {

Review Comment:
   Nice to see this code going away. It always tripped me up 😅 



##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
+
+import scala.concurrent.TimeoutException
+
+
+class AclPublisher(
+  id: Int,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  authorizer: Option[Authorizer],
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  override def name(): String = s"AclPublisher ${nodeType} id=${id}"
+
+  var completedInitialLoad = false
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+val deltaName = s"MetadataDelta up to ${newImage.offset()}"
+
+// Apply changes to ACLs. This needs to be handled carefully because while 
we are
+// applying these changes, the Authorizer is continuing to return 
authorization
+// results in other threads. We never want to expose an invalid state. For 
example,
+// if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+// we want to apply those changes in that order, not the reverse order! 
Otherwise
+// there could be a window during which incorrect authorization results 
are returned.
+Option(delta.aclsDelta()).foreach { aclsDelta =>
+  authorizer match {
+case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {

Review Comment:
   Since we have the manifest here, we can remove this `isSnapshotDelta` from 
AclsDelta



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1112,35 +1102,12 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-try {
-runnable.run();
-} finally {
-maybeCompleteAuthorizerInitialLoad();
-}
+runnable.run();
 }
 });
 }
 }
 
-private void maybeCompleteAuthorizerInitialLoad() {

Review Comment:
   The equivalent logic to this is handled by MetadataLoader, right? (not 
publishing until we've reached the HWM)



##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kaf

[GitHub] [kafka] divijvaidya commented on pull request #14168: MINOR; Fix nanosecond elapsed time

2023-08-09 Thread via GitHub


divijvaidya commented on PR #14168:
URL: https://github.com/apache/kafka/pull/14168#issuecomment-1671384884

   Wow! This is news to me. BTW there is a JIRA tracking this test failure at 
https://issues.apache.org/jira/browse/KAFKA-15052 and you might want to 
associate this PR with that JIRA.
   
   We uses System.nanoTime() for comparison all over the place such as at 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala#L573,
 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java#L231
 and many many other places. Do you think those places may be having the same 
overflow problem? If yes, can I request you to please create a JIRA for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] prestona commented on pull request #14173: KAFKA-15042: Improve TAGGED_FIELDS protocol documentation

2023-08-09 Thread via GitHub


prestona commented on PR #14173:
URL: https://github.com/apache/kafka/pull/14173#issuecomment-1671409792

   For context, any API message description that has a TAG_BUFFER field will be 
changed as follows
   https://github.com/apache/kafka/assets/10883734/108fd5a9-cf6c-4698-8311-ea6845bf7886";>
   
   "TAGGED_FIELDS" will have a row in the [Protocol Types 
table](https://kafka.apache.org/protocol.html#protocol_types), where the type 
will be described as:
   > Represents a sequence of tagged fields. First the length N + 1 is given as 
an UNSIGNED_VARINT. Then N tag field instances follow. A tag field is a triplet 
of a tag, a length, and data. The tag is an UNSIGNED_VARINT. The length F + 1 
is given as an UNSIGNED_VARINT. Null data is represented as a length of 0, 
otherwise F bytes of data follow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #14141: KAFKA-15100; KRaft data race with the expiration service

2023-08-09 Thread via GitHub


jsancio merged PR #14141:
URL: https://github.com/apache/kafka/pull/14141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #14124: Kafka-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes.

2023-08-09 Thread via GitHub


jeffkbkim commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1671432782

   Thanks @riedelmax!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-09 Thread via GitHub


clolov commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1288608136


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+private static final int SEG_SIZE = 1024 * 1024;
+
+private final Time time = new MockTime(1);
+private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+private TopicBasedRemoteLogMetadataManager rlmm() {
+return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+}
+
+@BeforeEach
+public void setup() {
+// Start the cluster only.
+remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+}
+
+@AfterEach
+public void teardown() throws IOException {
+remoteLogMetadataManagerHarness.close();
+}
+
+@Test
+public void testMultiplePartitionSubscriptions() throws Exception {
+// Create topics.
+String leaderTopic = "leader";
+HashMap> assignedLeaderTopicReplicas = new 
HashMap<>();
+List leaderTopicReplicas = new ArrayList<>();
+// Set broker id 0 as the first entry which is taken as the leader.
+leaderTopicReplicas.add(0);
+leaderTopicReplicas.add(1);
+leaderTopicReplicas.add(2);
+assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String followerTopic = "follower";
+HashMap> assignedFollowerTopicReplicas = new 
HashMap<>();
+List followerTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+followerTopicReplicas.add(1);
+followerTopicReplicas.add(2);
+followerTopicReplicas.add(0);
+assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+remoteLogMetadataManagerHarness.createTopicWithAssignment(
+followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+remoteLogMetadataManagerHarness.listenerName());
+
+String topicWithNoMessages = "no-messages-topic";
+HashMap> assignedTopicReplicas = new HashMap<>();
+List noMessagesTopicReplicas = new ArrayList<>();
+// Set broker id 1 as the first entry which is taken as the leader.
+noMessagesTopicReplicas.add(1);
+noMes

[GitHub] [kafka] AndrewJSchofield commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


AndrewJSchofield commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288667079


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -119,20 +130,24 @@ public synchronized Cluster fetch() {
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+return Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
 }
 
 /**
  * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
- * is now
+ * current info can be updated (i.e. backoff time has elapsed). There are 
two calculations for backing off based on
+ * how many requests with backing off have been issued, and how many 
attempts to retrieve metadata have been made
+ * since the last successful response. The first of these allows backing 
off when there are errors to do with
+ * stale metadata, even though the metadata responses are clean.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till updating the cluster info
  */
 public synchronized long timeToNextUpdate(long nowMs) {
-long timeToExpire = updateRequested() ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
+long timeToUpdateWithBackoff = Math.max(this.lastRefreshMs +
+(this.backoffUpdateRequests > 0 ? 
this.refreshBackoff.backoff(this.backoffUpdateRequests - 1) : 0) - nowMs, 0);
+long timeToExpire = Math.max(this.lastSuccessfulRefreshMs + 
this.metadataExpireMs - nowMs, 0);
+return Math.max(updateRequested() ? timeToUpdateWithBackoff : 
timeToExpire, timeToAllowUpdate(nowMs));

Review Comment:
   Yes, I agree. I've spun it round to simplify it and added comments. Will 
push the commit soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15100:
---
Fix Version/s: 3.5.2

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on a diff in pull request #14136: Add metadatacache into RemoteLogManager, and refactor all relevant codes

2023-08-09 Thread via GitHub


clolov commented on code in PR #14136:
URL: https://github.com/apache/kafka/pull/14136#discussion_r1288687969


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -347,15 +353,16 @@ public void onLeadershipChange(Set 
partitionsBecomeLeader,
 public void stopPartitions(TopicPartition topicPartition, boolean delete) {
 if (delete) {
 // Delete from internal datastructures only if it is to be deleted.
-Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+Map mapping = metaDataCache.topicNamesToIds();
+Uuid topicIdPartition = mapping.remove(topicPartition.topic());

Review Comment:
   I don't think there is anything to be done here. The topic partitions should 
disappear once the controller lets us know they should.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -280,13 +285,18 @@ public RemoteStorageManager storageManager() {
 return remoteLogStorageManager;
 }
 
+public MetadataCache metadataCache() {
+return metaDataCache;
+}
+
 private Stream filterPartitions(Set partitions) {
 // We are not specifically checking for internal topics etc here as 
`log.remoteLogEnabled()` already handles that.
 return partitions.stream().filter(partition -> 
partition.log().exists(UnifiedLog::remoteLogEnabled));
 }
 
 private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
-Uuid previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
+Map mapping = metaDataCache.topicNamesToIds();

Review Comment:
   Adding to Divij's comment - this is an unmodifiable map. Even if you wanted 
to you wouldn't be able to add to it:
   ```
 def topicNamesToIds(): util.Map[String, Uuid] = {
   Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vveicc commented on pull request #14175: KAFKA-15288: Change BrokerApiVersionsCommandTest to support kraft mode

2023-08-09 Thread via GitHub


vveicc commented on PR #14175:
URL: https://github.com/apache/kafka/pull/14175#issuecomment-1671596827

   Thank you for your advice @dengziming, this is very helpful to me. I have 
made some changes, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-09 Thread via GitHub


jeqo commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1288620620


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();

Review Comment:
   Similar here. We can initialize this and uninitializedAt in constructor, and 
pass SystemTime on test setup.



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;

Review Comment:
   Is this really needed? 
   I can see value is overwritten on test setup to 10L, but I can manage to 
test successfully with default value 100L.
   



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,393 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
-// It indicates whether the closing process has been started or not. If it 
is set as true,
-// consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
-// It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-// determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+// It indicates whether the ConsumerTask is closed or not.
+private volatile boolean isClosed = false;
+// It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+// has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+// ones it is no longer assigned to.
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile 

[jira] [Updated] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15100:
---
Fix Version/s: 3.4.2

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-08-09 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-15100.

Resolution: Fixed

> Unsafe to call tryCompleteFetchResponse on request timeout
> --
>
> Key: KAFKA-15100
> URL: https://issues.apache.org/jira/browse/KAFKA-15100
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> When the fetch request times out the future is completed from the 
> "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
> tryCompleteFetchResponse is always called from the same thread. This 
> invariant is violated in this case.
> {code:java}
>            return future.handle((completionTimeMs, exception) -> {
>               if (exception != null) {
>                   Throwable cause = exception instanceof ExecutionException ?
>                       exception.getCause() : exception;                  // 
> If the fetch timed out in purgatory, it means no new data is available,
>                   // and we will complete the fetch successfully. Otherwise, 
> if there was
>                   // any other error, we need to return it.
>                   Errors error = Errors.forException(cause);
>                   if (error != Errors.REQUEST_TIMED_OUT) {
>                       logger.info("Failed to handle fetch from {} at {} due 
> to {}",
>                           replicaId, fetchPartition.fetchOffset(), error);
>                       return buildEmptyFetchResponse(error, Optional.empty());
>                   }
>               }              // FIXME: `completionTimeMs`, which can be null
>               logger.trace("Completing delayed fetch from {} starting at 
> offset {} at {}",
>                   replicaId, fetchPartition.fetchOffset(), completionTimeMs);
>               return tryCompleteFetchRequest(replicaId, fetchPartition, 
> time.milliseconds());
>           });
> {code}
> One solution is to always build an empty response if the future was completed 
> exceptionally. This works because the ExpirationService completes the future 
> with a `TimeoutException`.
> A longer-term solution is to use a more flexible event executor service. This 
> would be a service that allows more kinds of event to get scheduled/submitted 
> to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph opened a new pull request, #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


kamalcph opened a new pull request, #14176:
URL: https://github.com/apache/kafka/pull/14176

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-08-09 Thread via GitHub


kamalcph commented on code in PR #14114:
URL: https://github.com/apache/kafka/pull/14114#discussion_r1288729452


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -102,49 +102,14 @@ public String topicWarningMessage(String topicName) {
 
 public static class RemoteLogConfig {

Review Comment:
   Opened #14176 to address this comment. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-09 Thread Jinyong Choi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752451#comment-17752451
 ] 

Jinyong Choi commented on KAFKA-15302:
--

[~mjsax] , [~guozhang] 
 
If you perform a delete() within a while() loop, it seems that due to the 
interactions of maybeEvict()->flush(), the value of a key that hasn't been 
traversed yet might return as stale data. Therefore, I consider this to be a 
bug.
 
The main changes are as follows: I've made modifications to give a hint to the 
cache, determining whether to call maybeEvict(), by checking the current state 
of the RocksDB KeyValueStore, particularly when it's in a snapshot state.
 
Please refer to the code snippet below for a complete view of the changes.(I 
haven't modified the test code.)

 
[https://github.com/apache/kafka/compare/trunk...jinyongchoi:kafka:KAFKA-15302-testing]
{code:java}
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
...
    @Override
    public boolean isEvictionInvocationViable() {
        return openIterators.isEmpty();
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
...
private void putInternal(final Bytes key, final byte[] value) {
        context.cache().put(
            cacheName,
            key,
            new LRUCacheEntry(
                value,
                context.headers(),
                true,
                context.offset(),
                context.timestamp(),
                context.partition(),
                context.topic()),
            wrapped().isEvictionInvocationViable());


        StoreQueryUtils.updatePosition(position, context);
    }

streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
...
public void put(final String namespace, final Bytes key, final 
LRUCacheEntry value, final boolean isEvictionViable) {
numPuts++;

final NamedCache cache = getOrCreateCache(namespace);

synchronized (cache) {
final long oldSize = cache.sizeInBytes();
cache.put(key, value);
sizeInBytes.getAndAdd(cache.sizeInBytes() - oldSize);
if (isEvictionViable) {
maybeEvict(namespace, cache);
}
}
}
{code}
 
After the modifications, the following thoughts arise:
 
1. It seems unnecessary to perform delete operations during traversal for 
SessionStore or TimestampedKeyValueStore, but this aspect needs documentation.
 
2. It functions as expected, but the code doesn't seem to be very clean.
 
3. Since flush() is suppressed during the while() loop, many keys are stored in 
the Cache. However, as their values are null, it appears there isn't a 
significant memory burden. Still, caution is warranted.
 
4. Due to the inhibition of flush() during the while() loop, a subsequent flush 
operation, as shown below, took 10 seconds.
While cases requiring the processing of 5,000,000 items at once are unlikely, 
this aspect also demands attention.

 
{code:java}
21:26:17.509 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush start dirtyKeys.size():500 entries:500
21:26:26.874 [...-StreamThread-1] INFO  o.a.k.s.state.internals.NamedCache -- 
Named Cache flush end dirtyKeys.size():0 entries:500{code}
5. If it takes time to correct it in the right direction, it might be a good 
idea to document this in advance to aid developers' understanding.

 

I'm not coming up with any better ideas.
If it takes time to make the correct modifications, I agree that we should 
update the documentation first.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any error

[jira] [Resolved] (KAFKA-15298) Disable DeleteRecords on Tiered Storage topics

2023-08-09 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov resolved KAFKA-15298.
---
Resolution: Won't Fix

> Disable DeleteRecords on Tiered Storage topics
> --
>
> Key: KAFKA-15298
> URL: https://issues.apache.org/jira/browse/KAFKA-15298
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>  Labels: tiered-storage
>
> Currently the DeleteRecords API does not work with Tiered Storage. We should 
> ensure that this is reflected in the responses that clients get when trying 
> to use the API with tiered topics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843014


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DefaultReplicationIntegrationTest.java:
##
@@ -0,0 +1,324 @@
+/*

Review Comment:
   I deleted the test. The change is small enough we can skip the integration 
test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843344


##
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java:
##
@@ -43,6 +47,11 @@ public void configure(Map props) {
 log.info("Using custom remote topic separator: '{}'", separator);
 separatorPattern = Pattern.compile(Pattern.quote(separator));
 }
+
+if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) {
+log.info("Disable the usage of topic separator for internal 
topics");
+isInternalTopicSeparatorEnabled = 
Boolean.valueOf(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString());
+}

Review Comment:
   Good shout! moved it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #14082: KAFKA-15102: Mirror Maker 2 - KIP690 backward compatibility

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #14082:
URL: https://github.com/apache/kafka/pull/14082#discussion_r1288843685


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -406,7 +406,6 @@ public void testReplication() throws Exception {
 "New topic was not replicated to primary cluster.");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288848313


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1112,35 +1102,12 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 if (this != metaLogListener) {
 log.debug("Ignoring {} raft event from an old 
registration", name);
 } else {
-try {
-runnable.run();
-} finally {
-maybeCompleteAuthorizerInitialLoad();
-}
+runnable.run();
 }
 });
 }
 }
 
-private void maybeCompleteAuthorizerInitialLoad() {

Review Comment:
   Yes. I added a comment to the call to `completeInitialLoad`
   
   ```
   if (!completedInitialLoad) {
 // If we are receiving this onMetadataUpdate call, that means the 
MetadataLoader has
 // loaded up to the local high water mark. So we complete the 
initial load, enabling
 // the authorizer.
 completedInitialLoad = true
 authorizer.completeInitialLoad()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288849218


##
metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java:
##
@@ -256,11 +224,6 @@ public void replay(
 throw new RuntimeException("Unable to replay " + record + " for " 
+ acl +
 ": acl not found " + "in existingAcls.");
 }
-if (!snapshotId.isPresent()) {

Review Comment:
   Yeah, it was always a messy corner case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15326) Decouple Processing Thread from Polling Thread

2023-08-09 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-15326:
--

 Summary: Decouple Processing Thread from Polling Thread
 Key: KAFKA-15326
 URL: https://issues.apache.org/jira/browse/KAFKA-15326
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Lucas Brutschy
Assignee: Lucas Brutschy


As part of an ongoing effort to implement a better threading architecture in 
Kafka streams, we decouple N stream threads into N polling threads and N 
processing threads. The effort to consolidate N polling thread into a single 
thread is follow-up after this ticket. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r1288877172


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r126278


##
tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandErrorTest.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * For some error cases, we can save a little build time by avoiding the 
overhead for
+ * cluster creation and cleanup because the command is expected to fail 
immediately.
+ */
+public class LeaderElectionCommandErrorTest {
+@Test
+public void testTopicWithoutPartition() {
+String out = ToolsTestUtils.captureStandardErr(() -> 
LeaderElectionCommand.main(
+new String[] {

Review Comment:
   Remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r126617


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {

Review Comment:
   Removed the `TerseException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r127595


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r127858


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] OmniaGM commented on a diff in pull request #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-08-09 Thread via GitHub


OmniaGM commented on code in PR #13204:
URL: https://github.com/apache/kafka/pull/13204#discussion_r128079


##
tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java:
##
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.util.EnumConverter;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ElectionNotNeededException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class LeaderElectionCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionCommand.class);
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+public static void main(String... args) {
+try {
+run(Duration.ofMillis(3), args);
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+} catch (Exception e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+}
+}
+
+static void run(Duration timeoutMs, String... args) throws Exception {
+LeaderElectionCommandOptions commandOptions = new 
LeaderElectionCommandOptions(args);
+
+commandOptions.maybePrintHelpOrVersion();
+
+commandOptions.validate();
+ElectionType electionType = commandOptions.getElectionType();
+Optional> jsonFileTopicPartitions =
+Optional.ofNullable(commandOptions.getPathToJsonFile())
+.map(path -> parseReplicaElectionData(path));
+
+Optional topicOption = 
Optional.ofNullable(commandOptions.getTopic());
+Optional partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
+final Optional> singleTopicPartition =
+(topicOption.isPresent() && partitionOption.isPresent()) ?
+Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get( :
+Optional.empty();
+
+/* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
+ * The validate function should be checking that this option is 
required if the --topic and --path-to-json-file
+ * are not specified.
+ */
+Optional> topicPartitions = 
jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
+
+Properties props = new Properties();
+if (commandOptions.hasAdminClientConfig()) {
+
props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig()));
+}
+props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
commandOptions.getBootstrapServer(

[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-09 Thread via GitHub


gharris1727 commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1671793380

   Looking forward at the `sync-manifests` implementation, I realized that I 
had some duplication. The `ManifestEntry`, `findManifests` function, and the 
exclude behavior I added in this PR are all redundant and will be replaced with 
a different system in the next PR.
   
   The new implementation has a lot more machinery included with it that would 
be un-motivated in this PR, so I'm going to leave this current but obsolete 
implementation in place for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Component/s: streams

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Major
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Priority: Critical  (was: Major)

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Priority: Critical
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #14169: KAFKA-15318: Update the Authorizer via AclPublisher

2023-08-09 Thread via GitHub


cmccabe commented on code in PR #14169:
URL: https://github.com/apache/kafka/pull/14169#discussion_r1288918541


##
core/src/main/scala/kafka/server/metadata/AclPublisher.scala:
##
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
+
+import scala.concurrent.TimeoutException
+
+
+class AclPublisher(
+  id: Int,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  authorizer: Option[Authorizer],
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  override def name(): String = s"AclPublisher ${nodeType} id=${id}"
+
+  var completedInitialLoad = false
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+val deltaName = s"MetadataDelta up to ${newImage.offset()}"
+
+// Apply changes to ACLs. This needs to be handled carefully because while 
we are
+// applying these changes, the Authorizer is continuing to return 
authorization
+// results in other threads. We never want to expose an invalid state. For 
example,
+// if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
+// we want to apply those changes in that order, not the reverse order! 
Otherwise
+// there could be a window during which incorrect authorization results 
are returned.
+Option(delta.aclsDelta()).foreach { aclsDelta =>
+  authorizer match {
+case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {

Review Comment:
   yes, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-09 Thread via GitHub


C0urante commented on code in PR #14156:
URL: https://github.com/apache/kafka/pull/14156#discussion_r1287870397


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync 
offsetSync) {
 }
 }
 
-private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, 
OffsetSync offsetSync) {

Review Comment:
   Nit: feels a little strange that we're passing in a to-be-mutated sync 
array. Any reason not to alter `updateSyncArray` to only take in the original 
sync array and the new sync, and construct and return the new sync array?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -153,31 +156,84 @@ public void testPastOffsetTranslation() {
 }
 
 @Test
-public void testKeepMostDistinctSyncs() {
+public void testConsistentlySpacedSyncs() {
 // Under normal operation, the incoming syncs will be regularly spaced 
and the store should keep a set of syncs
 // which provide the best translation accuracy (expires as few syncs 
as possible)
-// Each new sync should be added to the cache and expire at most one 
other sync from the cache
-long iterations = 1;
+long iterations = 100;
 long maxStep = Long.MAX_VALUE / iterations;
 // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
 for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
 for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
-try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-int lastCount = 1;
-store.start();
-for (long offset = firstOffset; offset <= iterations; 
offset += step) {
-store.sync(tp, offset, offset);
-// Invariant A: the latest sync is present
-assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
-// Invariant D: the earliest sync is present
-assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
-int count = countDistinctStoredSyncs(store, tp);
-int diff = count - lastCount;
-assertTrue(diff >= 0,
-"Store expired too many syncs: " + diff + " 
after receiving offset " + offset);
-lastCount = count;
-}
-}
+long finalStep = step;
+// Generate a stream of consistently spaced syncs
+// Each new sync should be added to the cache and expire at 
most one other sync from the cache
+assertSyncSpacingHasBoundedExpirations(firstOffset, 
LongStream.generate(() -> finalStep).limit(iterations), 1);
+}
+}
+}
+
+@Test
+public void testRandomlySpacedSyncs() {
+Random random = new Random(0L); // arbitrary but deterministic seed
+int iterationBits = 10;
+long iterations = 1 << iterationBits;
+for (int n = 1; n < Long.SIZE - iterationBits; n++) {
+// A stream with at most n bits of difference between the largest 
and smallest steps
+// will expire n + 2 syncs at once in the worst case, because the 
sync store is laid out exponentially.
+long maximumDifference = 1L << n;
+int maximumExpirations = n + 2;
+assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
0L, maximumDifference), maximumExpirations);
+// This holds true even if there is a larger minimum step size, 
such as caused by offsetLagMax
+long offsetLagMax = 1L << 16;
+assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
+}
+}
+
+@Test
+public void testDroppedSyncsSpacing() {
+Random random = new Random(0L); // arbitrary but deterministic seed
+long iterations = 1;
+long offsetLagMax = 100;
+// Half of the gaps will be offsetLagMax, and half will be double 
that, as if one intervening sync was dropped.
+LongStream stream = random.doubles()
+.mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
+.limit(iterations);
+// This will cause up to 2 syncs to be discarded, because a sequence 
of two adjacent syncs followed by a
+// dropped sync will set up the following situation
+// before [dd,c,b,a]
+// after  [e..e,d,a]
+// and syncs b and c are discarded to make room for e and the demoted 
sync d.
+assertSyncSpacingHasBoundedExpi

[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-08-09 Thread via GitHub


junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1287764297


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -956,6 +981,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }
 
+  private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, 
reason: LogStartOffsetIncrementReason): Unit = {
+lock synchronized {
+  if (newLocalLogStartOffset > localLogStartOffset()) {
+_localLogStartOffset = math.max(newLocalLogStartOffset, 
localLogStartOffset());

Review Comment:
   Since newLocalLogStartOffset is larger than localLogStartOffset(), could we 
just assign newLocalLogStartOffset to _localLogStartOffset?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -966,7 +1000,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @throws OffsetOutOfRangeException if the log start offset is greater than 
the high watermark
* @return true if the log start offset was updated; otherwise false
*/
-  def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: 
LogStartOffsetIncrementReason): Boolean = {
+  def maybeIncrementLogStartOffset(newLogStartOffset: Long,
+   reason: 
LogStartOffsetIncrementReason): Boolean = {

Review Comment:
   Identation doesn't match other places in this file.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =

Review Comment:
   Here is a corner case. Let's say remote log is enabled, but there is no 
remote segment (all have been deleted due to retention). The new logic will do 
retention based on `localRetentionBytes`, but it should actually do the 
retention based on `retentionSize`. If that happens, we need to advance 
logStartOffset, in addition to localLogStartOffset.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1464,12 +1513,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+  nextSegmentOpt.exists(_.baseOffset <= localLogStartOffset())

Review Comment:
   This doesn't look right. If remote log is not enabled, it seems that we 
should delete based on logStartOffset, not localLogStartOffset.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1390,7 +1424,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+  // Check not to delete segments which are not yet copied to tiered 
storage
+  val isSegmentTieredToRemoteStorage =
+if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage
+else true

Review Comment:
   Hmm, this should be false, right? Do we have a test case to cover that?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -147,11 +147,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   def localLogStartOffset(): Long = _localLogStartOffset
 
+  // This is the offset(inclusive) until which segments are copied to the 
remote storage.
   @volatile private var highestOffsetInRemoteStorage: Long = -1L
 
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))

Review Comment:
   This is an existing issue. But there is one direct reference to 
`_localLogStartOffset` in `fetchOffsetByTimestamp()`. Should we change that to 
use `localLogStartOffset()` instead?



###

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14176: KAFKA-15295: Add config validation when remote storage is enabled on a topic

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1288894765


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-}
+public static void validateValuesInBroker(Map props) {
+validateValues(props);
+Boolean isRemoteLogStorageSystemEnabled =
+(Boolean) 
props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+Boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) {
+throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+"Topic cannot be configured with remote log storage.");
 }
 
-if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) {
-Long retentionBytes = (Long) 
props.get(TopicConfig.RETENTION_BYTES_CONFIG);
-Long localLogRetentionBytes = (Long) 
props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
-if (retentionBytes > -1 && localLogRetentionBytes != -2) {
-if (localLogRetentionBytes == -1) {
-String message = String.format("Value must not be -1 as %s 
value is set as %d.",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
-if (localLogRetentionBytes > retentionBytes) {
-String message = String.format("Value must not be more 
than %s property value: %d",
-TopicConfig.RETENTION_BYTES_CONFIG, 
retentionBytes);
-throw new 
ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localLogRetentionBytes, message);
-}
+String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
+if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
+throw new ConfigException("Remote log storage is unsupported for 
the compacted topics");
+}

Review Comment:
   could be a separate function - `validateNoRemoteStorageForCompactedTopic`



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -459,49 +463,53 @@ public static void validateValues(Map props) {
 long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
 if (minCompactionLag > maxCompactionLag) {
 throw new InvalidConfigurationException("conflict topic config 
setting "
-+ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-+ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
++ TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
++ TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
 }
+}
 
-if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-throw new ConfigException("Remote log storage is unsupported 
for the c

[GitHub] [kafka] divijvaidya commented on a diff in pull request #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-09 Thread via GitHub


divijvaidya commented on code in PR #14161:
URL: https://github.com/apache/kafka/pull/14161#discussion_r1288939068


##
core/src/main/scala/kafka/server/ConfigHandler.scala:
##
@@ -62,6 +62,12 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
 topicConfig.asScala.forKeyValue { (key, value) =>
   if (!configNamesToExclude.contains(key)) props.put(key, value)
 }
+
+if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()

Review Comment:
   @clolov could you please check if we call some form of LogConfig.validate() 
on startup? (I think we do when we create LogManager). In which case you may 
want to bring the broker level config to LogConfig like this: 
https://github.com/apache/kafka/pull/14176 and add you new validation in 
LogManager.validate() similar to how we are validating in other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288946333


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   > My understanding of the PR is this, metadata request won't backoff, but 
produce request would backoff. So likely metadata is going to be updated next 
time around produce request is retried(post backoff).
   
   @msn-tldr : To me, the common reason why a produce request needs to backoff 
is that the metadata is stable since the latest metadata hasn't been propagated 
to the brokers yet. So, if we don't backoff the metadata request, the returned 
metadata may still be stale, which won't help the backed off produce request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #14177: MINOR: Fix SynchronizationTest Classloaders sometimes not being parallel capable

2023-08-09 Thread via GitHub


gharris1727 opened a new pull request, #14177:
URL: https://github.com/apache/kafka/pull/14177

   The `Classloader.registerAsParallelCapable()` call is intended to be called 
during static initialization. It must be called prior to the `Classloader` 
superconstructor, because there it is evaluated to decide whether the 
classloader instance created will be parallel capable.
   
   For the entire lifetime of this test, the 
`Classloader.registerAsParallelCapable()` call has been in an instance 
initializer, which is executed after the superconstructor is finished. This has 
meant that the first `SynchronizedDelegatingClassLoader`, and the first 
`SynchronizedPluginClassLoader` created have erroneously been 
non-parallel-capable.
   
   However, this test did not flaky-fail until #14089 was merged, because the 
`SamplingConverter` was never located in the first 
`SynchronizedPluginClassLoader`. With that PR, the 
`TestPlugins.pluginPath(TestPlugins.TestPlugin...)` function was changed to 
return a Set instead of a List. This meant that the `SamplingConverter` then 
_could_ appear in the first `SynchronizedPluginClassLoader`, and whenever that 
happened, the test would reproduce a deadlock and fail.
   
   Since there was only one `SynchronizedDelegatingClassLoader` created in this 
test, it was always non-parallel-capable. The test would only deadlock if 
_both_ classloaders were non-parallel-capable, which was not possible until 
recently.
   
   Importantly, this is only a bug in the test, as the real PluginClassLoader 
and DelegatingClassLoader have been parallel-capable since they were fixed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-09 Thread via GitHub


gharris1727 commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1289026069


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));

Review Comment:
   There's a utility method for assembling a plugin path of just select 
plugins, you don't need to do the filtering yourself :)
   ```suggestion
   PluginScanResult unversionedPluginsResult = 
scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER));
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {

Review Comment:
   ```suggestion
   public void testVersionedPluginsHasVersion() {
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {
+PluginScanResult versionedPluginResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1.resourceDir()));

Review Comment:
   ```suggestion
   PluginScanResult versionedPluginResult = 
scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1));
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {

Review Comment:
   ```suggestion
   public void testNonVersionedPluginHasUndefinedVersion() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15327) Async consumer should commit offsets on close

2023-08-09 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15327:
--

 Summary: Async consumer should commit offsets on close
 Key: KAFKA-15327
 URL: https://issues.apache.org/jira/browse/KAFKA-15327
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Reporter: Lianet Magrans


In the current implementation of the KafkaConsumer, the ConsumerCoordinator 
commits offsets before the consumer is closed, with a call to 
maybeAutoCommitOffsetsSync(timer);
The async consumer should provide the same behaviour to commit offsets on 
close. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-09 Thread via GitHub


gharris1727 commented on code in PR #14156:
URL: https://github.com/apache/kafka/pull/14156#discussion_r1289079347


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync 
offsetSync) {
 }
 }
 
-private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, 
OffsetSync offsetSync) {

Review Comment:
   That is already the signature of the caller, so this would amount to 
inlining this function into `updateExistingSyncs` to combine the two.
   
   I think that this function is already large enough, and doesn't also need to 
be concerned with copying the array or logging the result. I also think it 
makes batching updates simpler: The caller can manage the lifetimes of the 
arrays and their mutability, while the inner function can be concerned with 
just applying the updates.
   
   Also this code was doing that already, here and in the `clearSyncArray` 
function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


omkreddy commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289096459


##
core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala:
##
@@ -145,12 +163,12 @@ class DelegationTokenEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest
   val privilegedAdminClient = 
createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, 
kafkaPassword)
   try {
 val token = 
adminClient.createDelegationToken(createDelegationTokenOptions()).delegationToken().get()
-if (assert) {
-  assertToken(token)
-}
+//if (assert) {

Review Comment:
   Can we enable this code



##
core/src/main/scala/kafka/server/MetadataSupport.scala:
##
@@ -69,6 +69,16 @@ sealed trait MetadataSupport {
   handler(request)
 }
   }
+
+  def alsoMaybeForward(request: RequestChannel.Request,

Review Comment:
   looks unused



##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+

[jira] [Commented] (KAFKA-15042) Clarify documentation on Tagged fields

2023-08-09 Thread Adrian Preston (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752503#comment-17752503
 ] 

Adrian Preston commented on KAFKA-15042:


[~smashingquasar],

Looking at the hex dump of your APIVersions request, I think you are correctly 
encoding the empty tag field as a single 0x00 byte. It looks, however, like the 
compact string encoding (used for the 'client_software_name' and 
'client_software_version' fields) is adding an unexpected 0x01 as the first 
byte.

For comparison, here's the APIVersions request I get from Sarama:

{{00 00 00 21    (Length)}}
{{00 12  (API key)}}
{{00 03  (API version)}}
{{00 00 00 00    (Correlation ID)}}
{{00 07 77 65 62 2d 61 70 69 (Client ID)}}
{{00 (Tagged fields)}}
{{08 77 65 62 2d 61 70 69    (Client software name)}}
{{06 30 2e 30 2e 31  (Client software version)}}
{{00 (Tagged fields)}}

> Clarify documentation on Tagged fields
> --
>
> Key: KAFKA-15042
> URL: https://issues.apache.org/jira/browse/KAFKA-15042
> Project: Kafka
>  Issue Type: Wish
>  Components: docs
> Environment: Using the Ubuntu/Kafka Docker image for testing purposes.
>Reporter: Nicolas
>Assignee: Adrian Preston
>Priority: Major
>
> Hello,
> I am currently working on an implementation of the Kafka protocol.
> So far, all my code is working as intended through serialising requests and 
> deserialising response as long as I am not using the flex requests system.
> I am now trying to implement the flex requests system but the documentation 
> is scarce on the subject of tagged fields.
> If we take the Request Header v2:
>  
> {code:java}
> Request Header v2 => request_api_key request_api_version correlation_id 
> client_id TAG_BUFFER request_api_key => INT16 request_api_version => INT16 
> correlation_id => INT32 client_id => NULLABLE_STRING{code}
>  
> Here, the BNF seems violated. TAG_BUFFER is not a value in this situation. It 
> appears to be a type. It also does not appear within the detailed description 
> inside the BNF.
> TAG_BUFFER also does not refer to any declared type within the documentation. 
> It seems to indicate tagged fields though.
> Now when looking at tagged fields, the only mention of them within the 
> documentation is:
> {quote}Note that [KIP-482 tagged 
> fields|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields]
>  can be added to a request without incrementing the version number. This 
> offers an additional way of evolving the message schema without breaking 
> compatibility. Tagged fields do not take up any space when the field is not 
> set. Therefore, if a field is rarely used, it is more efficient to make it a 
> tagged field than to put it in the mandatory schema. However, tagged fields 
> are ignored by recipients that don't know about them, which could pose a 
> challenge if this is not the behavior that the sender wants. In such cases, a 
> version bump may be more appropriate.
> {quote}
> This leads to the KIP-482 that does not clearly and explicitly detail the 
> process of writing and reading those tagged fields.
> I decided to look up existing clients to understand how they handle tagged 
> fields. I notably looked at kafka-js (JavaScript client) and librdkafka (C 
> client) and to my surprise, they have not implemented tagged fields. In fact, 
> they rely on a hack to skip them and ignore them completely.
> I also had a look at the [Java client bundled within Kafka within the Tagged 
> Fields 
> section|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java#L64].
> Now I am not a Java developer so I may not understand exactly this code. I 
> read the comment and implemented the logic following Google's Protobuf 
> specifications. The problem is, this leads to a request that outputs a stack 
> trace within Kafka (it would be appreciated to not just dump stack traces and 
> gracefully handle errors by the way).
>  
> As a reference, I tried to send an APIVersions (key: 18) (version: 3) request.
> My request reads as follows when converted to hexadecimal:
>  
> {code:java}
> Request header: 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 00
> Request body: 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> Full request: 00 00 00 23 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 
> 00 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> {code}
>  
> This creates a buffer underflow error within Kafka:
> {code:java}
> [2023-05-31 14:14:31,132] ERROR Exception while processing request from 
> 172.21.0.5:9092-172.21.0.3:59228-21 (kafka.network.Processor

[GitHub] [kafka] omkreddy commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


omkreddy commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289107179


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,10 +112,11 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,
  val tokenCache: DelegationTokenCache,
- val time: Time,
- val zkClient: KafkaZkClient) extends Logging {
+ val time: Time) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
 
+  protected val lock = new Object()
+

Review Comment:
   Cab we remove unused DescribeResponseCallback type variable at line no:125



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Tanay Karmarkar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752504#comment-17752504
 ] 

Tanay Karmarkar commented on KAFKA-15224:
-

Another clarification needed, in the example PR, there are files with version 
bump without `SNAPSHOT` suffix. Would this script update files with 
`\{version}-SNAPSHOT` always?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Tanay Karmarkar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752504#comment-17752504
 ] 

Tanay Karmarkar edited comment on KAFKA-15224 at 8/9/23 7:46 PM:
-

[~divijvaidya] Another clarification needed, in the example PR, there are files 
with version bump without `SNAPSHOT` suffix. Would this script update files 
with `\{version}-SNAPSHOT` always?


was (Author: JIRAUSER301398):
Another clarification needed, in the example PR, there are files with version 
bump without `SNAPSHOT` suffix. Would this script update files with 
`\{version}-SNAPSHOT` always?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


lihaosky opened a new pull request, #14178:
URL: https://github.com/apache/kafka/pull/14178

   ## Description
   Use rack aware assignor in `StickyTaskAssignor`. This PR is on top of 
https://github.com/apache/kafka/pull/14164.
   
   ## Test
   Update existing unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15224) Automate version change to snapshot

2023-08-09 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752510#comment-17752510
 ] 

Divij Vaidya commented on KAFKA-15224:
--

> can we include new python packages?

Sure. Currently the expectation if for release manager to manually set up the 
dependencies in the machine that they will use to build the release. You will 
find the release process here and observe manual instructions such as  "Make 
sure you're running the script with Python3" or install "$pip3 install jira". 
Please feel free to add requirements.txt, in fact, that is I believe the 
preferred approach. Perhaps create a release folder and move release.py, 
requirements.txt and other new python scripts that we may create in that folder.

> in the example PR, there are files with version bump without `SNAPSHOT` 
> suffix. Would this script update files with `\{version}-SNAPSHOT` always

No, not always. As an example you will notice that we don't add "SNAPSHOT" 
suffix to fullDotVersion.js file. We run the set of commands in this PR when 
release is done for existing version and we want to prepare branch for next 
version (hence calling it snapshot). Note that we do inverse of this operations 
(removing snapshot) at 
[https://github.com/apache/kafka/blob/trunk/release.py#L559-L577.] We do the 
inverse prior to the release.

Does these answer your questions?

> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


mumrah commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289141277


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1498,6 +1543,30 @@ private void cancelNextWriteNoOpRecord() {
 queue.cancelDeferred(WRITE_NO_OP_RECORD);
 }
 
+private static final String WRITE_REMOVE_DELEGATIONTOKEN_RECORD = 
"writeRemoveDelegationTokenRecord";

Review Comment:
   I think should be something like "maybeExpireDelegationTokens" or something 
to reflect what this event is doing (rather than what record ends up being used)



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -176,8 +176,11 @@ public enum MetadataVersion {
 // Support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// Support for DelegationTokens

Review Comment:
   nit: "KRaft support for DelegationTokens"



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -633,4 +641,35 @@ void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationCo
 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
 });
 }
+
+void handleDelegationTokenDelta(DelegationTokenImage image, 
DelegationTokenDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
+Set updatedResources = delta.changes().keySet();

Review Comment:
   nit: "updatedTokens" ?



##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+pri

[jira] [Resolved] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-08-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14831.
---
Resolution: Fixed

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


mjsax commented on code in PR #14178:
URL: https://github.com/apache/kafka/pull/14178#discussion_r1289147669


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##
@@ -505,4 +505,15 @@ private void assertNotAssigned(final TaskId task) {
 throw new IllegalArgumentException("Tried to assign task " + task 
+ ", but it is already assigned: " + this);
 }
 }
+
+public ClientState copy() {

Review Comment:
   I think we usually use a copy-constructor for such cases.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java:
##
@@ -36,11 +41,18 @@
 public class StickyTaskAssignor implements TaskAssignor {
 
 private static final Logger log = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;
+private static final int STATELESS_TRAFFIC_COST = 1;
+private static final int STATELESS_NON_OVERLAP_COST = 0;

Review Comment:
   Seems better to make the vars in the prod code accessible instead of 
duplicating them?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -346,6 +346,7 @@ public long optimizeActiveTasks(final SortedSet 
activeTasks,
 log.info("Assignment before active task optimization is {}\n with cost 
{}", clientStates,
 activeTasksCost(activeTasks, clientStates, trafficCost, 
nonOverlapCost));
 
+final long startTime = System.currentTimeMillis();

Review Comment:
   We should avoid calling `System.currentTimeMillis()` directly, but use the 
central `Time` object instead. Not sure how complex it would be to get a handle 
on it? (also below)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -617,21 +652,23 @@ static Map>> 
getRandomProcessRacks(final int
 }
 Collections.shuffle(racks);

Review Comment:
   Just realizing that we might want to hand in a `Random` object as second 
parameter to allow us to reproduce a test run? Can you check the code for other 
places where we use `shuffle` to allow us to improve it across the board?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aindriu-aiven commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-09 Thread via GitHub


aindriu-aiven commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1289206301


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));

Review Comment:
   Thanks Manually committed to add TestPlugins dependency workaround



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -145,10 +148,28 @@ public void testScanningMixOfValidAndInvalidPlugins() 
throws Exception {
 assertEquals(expectedClasses, classes);
 }
 
+@Test
+public void testScannedPluingsForUndefinedVersion() {
+PluginScanResult unversionedPluginsResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER.resourceDir()));
+assertFalse(unversionedPluginsResult.isEmpty());
+unversionedPluginsResult.forEach(pluginDesc -> 
assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+}
+
+@Test
+public void testScannedPluingsForVersion() {
+PluginScanResult versionedPluginResult = 
scan(filterPluginsResourceDir(TestPlugins.pluginPath(), 
TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1.resourceDir()));

Review Comment:
   Thanks Manually committed to add TestPlugins dependency workaround



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752519#comment-17752519
 ] 

Matthias J. Sax commented on KAFKA-12317:
-

Just catching on of comments...
{quote}KAFKA-12845 is in status resolved so I assume this one is no longer 
relevant.
{quote}
Yes, it was closes as a duplicate of this ticket. Ie, we should make the change 
not just for left stream-table, but also for left stream-globalKTable.

Thanks [~guozhang]; I also prefer a KIP.

Thanks [~aki] for the KIP and PR.

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Labels: kip  (was: )

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Description: 
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`{-}key (`null`-join-key for 
stream-globalTable), because for a `null`{-}(join)key the join is undefined: 
ie, we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Currently, for a stream-streams and stream-table/globalTable join KafkaStreams 
drops all stream records with a `null`-key (`null`-join-key for 
stream-globalTable), because for a `null`-(join)key the join is undefined: ie, 
we don't have an attribute the do the table lookup (we consider the 
stream-record as malformed). Note, that we define the semantics of _left/outer_ 
join as: keep the stream record if no matching join record was found.

We could relax the definition of _left_ stream-table/globalTable and 
_left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
records, and call the ValueJoiner with a `null` "other-side" value instead: if 
the stream record key (or join-key) is `null`, we could treat is as "failed 
lookup" instead of treating the stream record as corrupted.

If we make this change, users that want to keep the current behavior, can add a 
`filter()` before the join to drop `null`-(join)key records from the stream 
explicitly.

Note that this change also requires to change the behavior if we insert a 
repartition topic before the join: currently, we drop `null`-key record before 
writing into the repartition topic (as we know they would be dropped later 
anyway). We need to relax this behavior for a left stream-table and left/outer 
stream-stream join. User need to be aware (ie, we might need to put this into 
the docs and JavaDocs), that records with `null`-key would be partitioned 
randomly.


> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`{-}key (`null`-join-key 
> for stream-globalTable), because for a `null`{-}(join)key the join is 
> undefined: ie, we don't have an attribute the do the table lookup (we 
> consider the stream-record as malformed). Note, that we define the semantics 
> of _left/outer_ join as: keep the stream record if no matching join record 
> was found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyw

[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Labels: kip  (was: )

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Description: 
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.

KIP-962: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 

  was:
Kafka Streams enforces a strict non-null-key policy in the DSL across all 
key-dependent operations (like aggregations and joins).

This also applies to FK-joins, in particular to the ForeignKeyExtractor. If it 
returns `null`, it's treated as invalid. For left-joins, it might make sense to 
still accept a `null`, and add the left-hand record with an empty 
right-hand-side to the result.


> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.
> KIP-962: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels: kip  (was: beginner newbie)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky commented on a diff in pull request #14178: KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor

2023-08-09 Thread via GitHub


lihaosky commented on code in PR #14178:
URL: https://github.com/apache/kafka/pull/14178#discussion_r1289205241


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -617,21 +652,23 @@ static Map>> 
getRandomProcessRacks(final int
 }
 Collections.shuffle(racks);

Review Comment:
   Looks `shuffle` can take random. I think we can use one static random in 
this class



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##
@@ -505,4 +505,15 @@ private void assertNotAssigned(final TaskId task) {
 throw new IllegalArgumentException("Tried to assign task " + task 
+ ", but it is already assigned: " + this);
 }
 }
+
+public ClientState copy() {

Review Comment:
   Make sense. Let me add a copy constructor.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -346,6 +346,7 @@ public long optimizeActiveTasks(final SortedSet 
activeTasks,
 log.info("Assignment before active task optimization is {}\n with cost 
{}", clientStates,
 activeTasksCost(activeTasks, clientStates, trafficCost, 
nonOverlapCost));
 
+final long startTime = System.currentTimeMillis();

Review Comment:
   There's a time in `StreamPartitionAssignor`, we can pass it into 
`RackAwareTaskAssignor` constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14049:

Labels:   (was: kip)

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2023-08-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752522#comment-17752522
 ] 

Matthias J. Sax commented on KAFKA-14049:
-

I am still not sure if I understand this ticket? – Or is it a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-12317 ?

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Assignee: Florin Akermann
>Priority: Major
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee opened a new pull request, #14179: MINOR: CommitRequestManager should only poll when the coordinator node is known

2023-08-09 Thread via GitHub


philipnee opened a new pull request, #14179:
URL: https://github.com/apache/kafka/pull/14179

   As title, we discovered a flaky bug during testing that it would seldomly 
thrown a NOT_COORDINATOR exception, which means the request was routed to a 
non-coordinator node.  We discovered that if we don't check the coordinator 
node in the commitRequestManager, the request manager will pass on an empty 
node to the NetworkClientDelegate, which implies the request can be sent to any 
node in the cluster.  This behavior is incorrect as the commit requests need to 
be routed to a coordinator node.
   
   Because the timing coordinator's discovery during integration testing isn't 
entirely deterministic; therefore, the test became extremely flaky. After 
fixing this: The coordinator node is mandatory before attempt to enqueue these 
commit request to the NetworkClient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >