Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2008700834

   the failed test pass on my local:
   ```sh
   ./gradlew cleanTest :streams:test --tests 
HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
 :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful :metadata:test 
--tests QuorumControllerTest.testBrokerHeartbeatDuringMigration :core:test 
--tests 
PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords
 --tests ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
KafkaZkClientTest.testConcurrentKRaftControllerClaim --tests 
ZkMigrationIntegrationTest.testMigrateTopicDeletions --tests 
ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle
   ```
   Also, the system test `consumer_group_command_test.py` pass on my local too. 
will merge it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16222: KRaft Migration: Incorrect default user-principal quota after migration [kafka]

2024-03-19 Thread via GitHub


omkreddy commented on PR #15481:
URL: https://github.com/apache/kafka/pull/15481#issuecomment-2008684718

   @showuon Can we merge the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-19 Thread via GitHub


sjhajharia commented on PR #15486:
URL: https://github.com/apache/kafka/pull/15486#issuecomment-2008655935

   On that note @jolshan , do we any more open questions to address or we are 
good to go with this PR? Or are we in flux for 
[this](https://github.com/apache/kafka/pull/15486#discussion_r1523597267)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16391) Cleanup .lock file after server is down

2024-03-19 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828597#comment-17828597
 ] 

PoAn Yang commented on KAFKA-16391:
---

Yes, I think so. From source code, it only close file channel.

[https://github.com/apache/kafka/blob/12a1d85362bfc200714bb2d76e6da7f5af1f82dd/core/src/main/scala/kafka/utils/FileLock.scala#L73-L81]

> Cleanup .lock file after server is down
> ---
>
> Key: KAFKA-16391
> URL: https://issues.apache.org/jira/browse/KAFKA-16391
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> Currently, server adds a `.lock` file to each log folder. The file is useless 
> after server is down.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16391) Cleanup .lock file after server is down

2024-03-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828595#comment-17828595
 ] 

Chia-Ping Tsai commented on KAFKA-16391:


[~yangpoan] pardon me, are those .lock files existent even though the server 
run "graceful shutdown"?

> Cleanup .lock file after server is down
> ---
>
> Key: KAFKA-16391
> URL: https://issues.apache.org/jira/browse/KAFKA-16391
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> Currently, server adds a `.lock` file to each log folder. The file is useless 
> after server is down.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828594#comment-17828594
 ] 

Chia-Ping Tsai commented on KAFKA-16385:


{quote}
One potential way to improve this is to use the timestamp index to find the 
cutoff offset in the active segment and move the logStartOffset to that point. 
We need to understand if there is any additional I/O impact because of this.
{quote}

 not sure whether it is worthwhile improvement. We should not encourage users 
to expect that the cleanup can delete segments accurately. Especially, user can 
define their timestamp so the expired records could be still existent even 
though we can move the logStartOffset. For example: (non-expired record has 
offset=2, timestamp=100) and (expired record has offset=3, timestamp=90)

{quote}
 As you observed, the current implementation is a bit weird since it depends on 
whether there are new records or not. 
{quote}

That probably makes sense: The segment is NOT expired as it has new records :)

In short, the implementation of retention.ms could roll and then delete the 
active segment. We should improve the documents for such scenario.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16391) Cleanup .lock file after server is down

2024-03-19 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-16391:
-

 Summary: Cleanup .lock file after server is down
 Key: KAFKA-16391
 URL: https://issues.apache.org/jira/browse/KAFKA-16391
 Project: Kafka
  Issue Type: Improvement
Reporter: PoAn Yang
Assignee: PoAn Yang


Currently, server adds a `.lock` file to each log folder. The file is useless 
after server is down.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13949: /connectors endpoint supports listing task-configs and active topics [kafka]

2024-03-19 Thread via GitHub


github-actions[bot] commented on PR #15027:
URL: https://github.com/apache/kafka/pull/15027#issuecomment-2008611925

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16390) consumer_bench_test.py failed using AsyncKafkaConsumer

2024-03-19 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16390:
--

 Summary: consumer_bench_test.py failed using AsyncKafkaConsumer
 Key: KAFKA-16390
 URL: https://issues.apache.org/jira/browse/KAFKA-16390
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Philip Nee


Ran the system test based on KAFKA-16273

The following tests failed using the consumer group protocol
{code:java}
kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer

kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer

kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
 {code}
Because of
{code:java}
 TimeoutError('consume_workload failed to finish in the expected amount of 
time.')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
    data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", line 
146, in test_single_partition
    consume_workload.wait_for_done(timeout_sec=180)
  File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 352, 
in wait_for_done
    wait_until(lambda: self.done(),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: consume_workload failed to finish in the expected 
amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14752: Kafka examples improvements - processor changes [kafka]

2024-03-19 Thread via GitHub


gaoran10 commented on code in PR #13516:
URL: https://github.com/apache/kafka/pull/13516#discussion_r1531455051


##
examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java:
##
@@ -81,111 +88,115 @@ public ExactlyOnceMessageProcessor(final String 
inputTopic,
 
 @Override
 public void run() {
-// Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
-producer.initTransactions();
-
-final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
-
-consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-@Override
-public void onPartitionsRevoked(Collection 
partitions) {
-printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-}
-
-@Override
-public void onPartitionsAssigned(Collection 
partitions) {
-printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-messageRemaining.set(messagesRemaining(consumer));
-}
-});
-
-int messageProcessed = 0;
-while (messageRemaining.get() > 0) {
-try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(200));
-if (records.count() > 0) {
-// Begin a new transaction session.
-producer.beginTransaction();
-for (ConsumerRecord record : records) {
-// Process the record and send to downstream.
-ProducerRecord customizedRecord = 
transform(record);
-producer.send(customizedRecord);
+int processedRecords = 0;
+long remainingRecords = Long.MAX_VALUE;
+// it is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster
+int transactionTimeoutMs = 10_000;
+// consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data
+boolean readCommitted = true;
+try (KafkaProducer producer = new 
Producer("processor-producer", bootstrapServers, outputTopic,
+true, transactionalId, true, -1, transactionTimeoutMs, 
null).createKafkaProducer();
+ KafkaConsumer consumer = new 
Consumer("processor-consumer", bootstrapServers, inputTopic,
+ "processor-group", Optional.of(groupInstanceId), 
readCommitted, -1, null).createKafkaConsumer()) {
+// called first and once to fence zombies and abort any pending 
transaction
+producer.initTransactions();
+
+consumer.subscribe(singleton(inputTopic), this);
+
+Utils.printOut("Processing new records");
+while (!closed && remainingRecords > 0) {
+try {
+ConsumerRecords records = 
consumer.poll(ofMillis(200));
+if (!records.isEmpty()) {
+// begin a new transaction session
+producer.beginTransaction();
+
+for (ConsumerRecord record : records) 
{
+// process the record and send downstream
+ProducerRecord newRecord =
+new ProducerRecord<>(outputTopic, 
record.key(), record.value() + "-ok");
+producer.send(newRecord);
+}
+
+// checkpoint the progress by sending offsets to group 
coordinator broker
+// note that this API is only available for broker >= 
2.5
+
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), 
consumer.groupMetadata());
+
+// commit the transaction including offsets
+producer.commitTransaction();
+processedRecords += records.count();
 }
+} catch (AuthorizationException | UnsupportedVersionException 
| ProducerFencedException
+ | FencedInstanceIdException | 
OutOfOrderSequenceException | SerializationException e) {
+// we can't recover from these exceptions
+Utils.printErr(e.getMessage());
+shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());
+consumer.commitSync();
+} catch (KafkaException e) {
+// abort the transaction and try to continue
+Utils.printOut("Aborting transaction: %s", e);
+

[jira] [Resolved] (KAFKA-12217) Apply the new features of Junit 5.8 to code base

2024-03-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12217.

Resolution: Fixed

> Apply the new features of Junit 5.8 to code base
> 
>
> Key: KAFKA-12217
> URL: https://issues.apache.org/jira/browse/KAFKA-12217
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are two useful new features of Junit 5.8.
> 1. assertInstanceOf (https://github.com/junit-team/junit5/pull/2499)
> It offers more meaningful error message than "assertTrue(obj instanceof X) "
> 2. junit.jupiter.params.displayname.default 
> (https://github.com/junit-team/junit5/pull/2532)
> It offers the default display name for all parameterized tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12187) replace assertTrue(obj instanceof X) by assertInstanceOf when we update to JUnit 5.8

2024-03-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12187.

Fix Version/s: 3.8.0
   Resolution: Fixed

> replace assertTrue(obj instanceof X) by assertInstanceOf when we update to 
> JUnit 5.8
> 
>
> Key: KAFKA-12187
> URL: https://issues.apache.org/jira/browse/KAFKA-12187
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.8.0
>
>
> see [https://github.com/apache/kafka/pull/9874#discussion_r556547909]
>  
> {quote}Yeah, for existing code improvements (versus code introduced by this 
> change), let's do it via a different PR. For this particular issue, we can 
> probably wait for JUnit 5.8 and use:
> {quote}
> * New assertInstanceOf methods as a replacement for assertTrue(obj instanceof 
> X) which provide better error messages comparable to those of assertThrows.
>  related PR: https://github.com/junit-team/junit5/pull/2499



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-19 Thread via GitHub


chia7712 merged PR #15512:
URL: https://github.com/apache/kafka/pull/15512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on PR #15512:
URL: https://github.com/apache/kafka/pull/15512#issuecomment-2008579703

   the failed tests pass on my local:
   ```sh
   ./gradlew cleanTest :streams:test --tests 
HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
 :storage:test --tests 
TransactionsWithTieredStoreTest.testCommitTransactionTimeout 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers --tests 
QuorumControllerTest.testBalancePartitionLeaders :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests IdentityReplicationIntegrationTest.testReplicateFromLatest --tests 
MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs :core:test --tests 
PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests 
PlaintextConsumerTest.testFetchOutO
 fRangeOffsetResetConfigLatest --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
SslTransportLayerTest.testServerTruststoreDynamicUpdate
   ```
   will commit it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix javadoc warnings [kafka]

2024-03-19 Thread via GitHub


gaurav-narula closed pull request #15527: MINOR: fix javadoc warnings
URL: https://github.com/apache/kafka/pull/15527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-19 Thread via GitHub


jolshan commented on PR #15541:
URL: https://github.com/apache/kafka/pull/15541#issuecomment-200839

   Looks pretty reasonable to me. I also want to get a +1 from Kirk to make 
sure he agrees it makes sense. 
   I will also let the tests run.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16388) add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString

2024-03-19 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng reassigned KAFKA-16388:
-

Assignee: Kuan Po Tseng

> add production-ready test of 3.3 - 3.6 release to 
> MetadataVersionTest.testFromVersionString
> ---
>
> Key: KAFKA-16388
> URL: https://issues.apache.org/jira/browse/KAFKA-16388
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: newbie
>
> https://github.com/apache/kafka/blob/trunk/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java#L169
> we have already released 3.3 ~ 3.6, and so they should be included by 
> MetadataVersionTest.testFromVersionString
> {code:java}
> assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
> assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
> assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
> assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
> {code} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16388) add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString

2024-03-19 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828542#comment-17828542
 ] 

Kuan Po Tseng commented on KAFKA-16388:
---

I'm willing to take over this~ Thanks !

> add production-ready test of 3.3 - 3.6 release to 
> MetadataVersionTest.testFromVersionString
> ---
>
> Key: KAFKA-16388
> URL: https://issues.apache.org/jira/browse/KAFKA-16388
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: newbie
>
> https://github.com/apache/kafka/blob/trunk/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java#L169
> we have already released 3.3 ~ 3.6, and so they should be included by 
> MetadataVersionTest.testFromVersionString
> {code:java}
> assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
> assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
> assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
> assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
> {code} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-19 Thread via GitHub


CalvinConfluent commented on PR #15541:
URL: https://github.com/apache/kafka/pull/15541#issuecomment-2008340124

   @kirktrue Move the test to the Sender test because less mock is needed to 
repro the bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-19 Thread via GitHub


CalvinConfluent commented on code in PR #15541:
URL: https://github.com/apache/kafka/pull/15541#discussion_r1531252560


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -270,14 +270,7 @@ public void run() {
 while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
 if (!transactionManager.isCompleting()) {
 log.info("Aborting incomplete transaction due to shutdown");
-
-try {
-// It is possible for the transaction manager to throw 
errors when aborting. Catch these
-// so as not to interfere with the rest of the shutdown 
logic.
-transactionManager.beginAbort();
-} catch (Exception e) {
-log.error("Error in kafka producer I/O thread while 
aborting transaction: ", e);
-}
+transactionManager.beginAbort();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16389:
-

Assignee: Lianet Magrans

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828538#comment-17828538
 ] 

Kirk True edited comment on KAFKA-16389 at 3/19/24 11:21 PM:
-

I've attached [^KAFKA-16389.patch] which _appears_ to address the issue in the 
system test so that it passes.

The patch includes two changes:

1. {{verifiable_consumer.py}} now updates the test’s list of assignments 
instead of overwriting them
2. {{consumer_test.py}} now waits for the assignments to be made instead of 
expecting them to be immediately correct

I am _not_ necessarily suggesting that we change the test to make the new 
consumer work. This is patch is provided to show how the new consumer behaves 
differently than the old.


was (Author: kirktrue):
I've attached a patch which _appears_ to address the issue in the system test 
so that it passes.

The patch includes two changes:

1. {{verifiable_consumer.py}} now updates the test’s list of assignments 
instead of overwriting them
2. {{consumer_test.py}} now waits for the assignments to be made instead of 
expecting them to be immediately correct

I am _not_ necessarily suggesting that we change the test to make the new 
consumer work. This is patch is provided to show how the new consumer behaves 
differently than the old.

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828538#comment-17828538
 ] 

Kirk True commented on KAFKA-16389:
---

I've attached a patch which _appears_ to address the issue in the system test 
so that it passes.

The patch includes two changes:

1. {{verifiable_consumer.py}} now updates the test’s list of assignments 
instead of overwriting them
2. {{consumer_test.py}} now waits for the assignments to be made instead of 
expecting them to be immediately correct

I am _not_ necessarily suggesting that we change the test to make the new 
consumer work. This is patch is provided to show how the new consumer behaves 
differently than the old.

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16389:
--
Description: 
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.

  was:
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.

Note: the attached 


> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise 

[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16389:
--
Description: 
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.

Note: the attached 

  was:
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.


> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise 

[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16389:
--
Attachment: KAFKA-16389.patch

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16389:
--
Description: 
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then set the the {{TC_PATHS}} environment variable to include that test suite 
file.

  was:
The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then run set the the {{TC_PATHS}} environment variable to include that test 
suite file.


> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> 

[jira] [Created] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-03-19 Thread Kirk True (Jira)
Kirk True created KAFKA-16389:
-

 Summary: consumer_test.py’s test_valid_assignment fails with new 
consumer
 Key: KAFKA-16389
 URL: https://issues.apache.org/jira/browse/KAFKA-16389
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.7.0
Reporter: Kirk True
 Fix For: 3.8.0


The following error is reported when running the {{test_valid_assignment}} test 
from {{consumer_test.py}}:

 {code}
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
584, in test_valid_assignment
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
{code}

To reproduce, create a system test suite file named 
{{test_valid_assignment.yml}} with these contents:

{code:yaml}
failures:
  - 
'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
{code}

Then run set the the {{TC_PATHS}} environment variable to include that test 
suite file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16384) KRaft controller number recommendation

2024-03-19 Thread Peter (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter updated KAFKA-16384:
--
Component/s: (was: controller)

> KRaft controller number recommendation
> --
>
> Key: KAFKA-16384
> URL: https://issues.apache.org/jira/browse/KAFKA-16384
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: Peter
>Priority: Minor
>
> There seems to be some conflicting information about how many controllers 
> should be used for a KRaft cluster. The first section listed mentions 3 or 5 
> controllers may be used, but the second section mentions no more than 3 
> should be used at the moment.
> https://kafka.apache.org/documentation/#kraft_voter
> > A Kafka admin will typically select 3 or 5 servers for this role, depending 
> > on factors like cost and the number of concurrent failures your system 
> > should withstand without availability impact. A majority of the controllers 
> > must be alive in order to maintain availability. With 3 controllers, the 
> > cluster can tolerate 1 controller failure; with 5 controllers, the cluster 
> > can tolerate 2 controller failures.
> https://kafka.apache.org/documentation/#kraft_deployment
> > For redundancy, a Kafka cluster should use 3 controllers. More than 3 
> > controllers is not recommended in critical environments. In the rare case 
> > of a partial network failure it is possible for the cluster metadata quorum 
> > to become unavailable. This limitation will be addressed in a future 
> > release of Kafka.
>  
> Is 3 still the recommended number and is there more information on what the 
> network issues are that could cause issues when using 5 controllers?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16384) KRaft controller number recommendation

2024-03-19 Thread Peter (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter updated KAFKA-16384:
--
Affects Version/s: 3.7.0

> KRaft controller number recommendation
> --
>
> Key: KAFKA-16384
> URL: https://issues.apache.org/jira/browse/KAFKA-16384
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, kraft
>Affects Versions: 3.7.0
>Reporter: Peter
>Priority: Minor
>
> There seems to be some conflicting information about how many controllers 
> should be used for a KRaft cluster. The first section listed mentions 3 or 5 
> controllers may be used, but the second section mentions no more than 3 
> should be used at the moment.
> https://kafka.apache.org/documentation/#kraft_voter
> > A Kafka admin will typically select 3 or 5 servers for this role, depending 
> > on factors like cost and the number of concurrent failures your system 
> > should withstand without availability impact. A majority of the controllers 
> > must be alive in order to maintain availability. With 3 controllers, the 
> > cluster can tolerate 1 controller failure; with 5 controllers, the cluster 
> > can tolerate 2 controller failures.
> https://kafka.apache.org/documentation/#kraft_deployment
> > For redundancy, a Kafka cluster should use 3 controllers. More than 3 
> > controllers is not recommended in critical environments. In the rare case 
> > of a partial network failure it is possible for the cluster metadata quorum 
> > to become unavailable. This limitation will be addressed in a future 
> > release of Kafka.
>  
> Is 3 still the recommended number and is there more information on what the 
> network issues are that could cause issues when using 5 controllers?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16384) KRaft controller number recommendation

2024-03-19 Thread Peter (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter updated KAFKA-16384:
--
Issue Type: Improvement  (was: Bug)

> KRaft controller number recommendation
> --
>
> Key: KAFKA-16384
> URL: https://issues.apache.org/jira/browse/KAFKA-16384
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, kraft
>Reporter: Peter
>Priority: Minor
>
> There seems to be some conflicting information about how many controllers 
> should be used for a KRaft cluster. The first section listed mentions 3 or 5 
> controllers may be used, but the second section mentions no more than 3 
> should be used at the moment.
> https://kafka.apache.org/documentation/#kraft_voter
> > A Kafka admin will typically select 3 or 5 servers for this role, depending 
> > on factors like cost and the number of concurrent failures your system 
> > should withstand without availability impact. A majority of the controllers 
> > must be alive in order to maintain availability. With 3 controllers, the 
> > cluster can tolerate 1 controller failure; with 5 controllers, the cluster 
> > can tolerate 2 controller failures.
> https://kafka.apache.org/documentation/#kraft_deployment
> > For redundancy, a Kafka cluster should use 3 controllers. More than 3 
> > controllers is not recommended in critical environments. In the rare case 
> > of a partial network failure it is possible for the cluster metadata quorum 
> > to become unavailable. This limitation will be addressed in a future 
> > release of Kafka.
>  
> Is 3 still the recommended number and is there more information on what the 
> network issues are that could cause issues when using 5 controllers?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]

2024-03-19 Thread via GitHub


gaurav-narula commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2008277511

   Thanks @pasharik ! I was able to reproduce it using the steps you mentioned 
above.
   
   I've a potential fix. Can you please try updating the zinc version in 
`gradle/dependencies.gradle:166`  to `1.9.6` and try the steps once again? It 
seems to fix things on my end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Dmitry Werner (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828518#comment-17828518
 ] 

Dmitry Werner commented on KAFKA-14482:
---

[~ijuma] Thank you for the explanation.

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-14482:
-

Assignee: Dmitry Werner  (was: Ismael Juma)

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Dmitry Werner
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-14482:
-

Assignee: Ismael Juma  (was: Dmitry Werner)

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-14482:
-

Assignee: Ismael Juma  (was: Dmitry Werner)

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828514#comment-17828514
 ] 

Ismael Juma commented on KAFKA-14482:
-

[~javakillah] This ticket can't be done until the dependency listed above. I 
have something in progress, but haven't submitted for that reason.

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Dmitry Werner
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16367) Full ConsumerGroupHeartbeat response must be sent when full request is received

2024-03-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16367.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Full ConsumerGroupHeartbeat response must be sent when full request is 
> received
> ---
>
> Key: KAFKA-16367
> URL: https://issues.apache.org/jira/browse/KAFKA-16367
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac merged PR #15533:
URL: https://github.com/apache/kafka/pull/15533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14482) Move LogLoader to storage module

2024-03-19 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-14482:
-

Assignee: Dmitry Werner  (was: Ismael Juma)

> Move LogLoader to storage module
> 
>
> Key: KAFKA-14482
> URL: https://issues.apache.org/jira/browse/KAFKA-14482
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Dmitry Werner
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828506#comment-17828506
 ] 

Jun Rao commented on KAFKA-16385:
-

[~showuon] : Yes, the retention by time is supposed to cover the active segment 
too. As you observed, the current implementation is a bit weird since it 
depends on whether there are new records or not. One potential way to improve 
this is to use the timestamp index to find the cutoff offset in the active 
segment and move the logStartOffset to that point. We need to understand if 
there is any additional I/O impact because of this.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15756: [2/3] Migrate existing integration tests to run old protocol in new coordinator [kafka]

2024-03-19 Thread via GitHub


dongnuo123 commented on code in PR #14675:
URL: https://github.com/apache/kafka/pull/14675#discussion_r1531051374


##
core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala:
##
@@ -89,6 +96,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
 val producer = createProducer()
 producerSend(producer, numRecords)
 
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"3")
+

Review Comment:
   Ah thanks for the catch. We shouldn't need it. Let me change it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-03-19 Thread via GitHub


OmniaGM commented on PR #15501:
URL: https://github.com/apache/kafka/pull/15501#issuecomment-2008054490

   > Hello @OmniaGM 
   > 
   > 
   > 
   > Are you be able to split this PR in several to simplify review and merge?
   > 
   > My jira https://issues.apache.org/jira/browse/KAFKA-14588 blocked by 
migration `KafkaConfig` from `core` so I want to help you with these changes 
somehow. 
   > 
   > 
   > 
   > If you have no time to split - I can offer my help with this.
   
   Start doing this but will be continue this after the Kafka Summit London. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2024-03-19 Thread keith.paulson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828501#comment-17828501
 ] 

keith.paulson commented on KAFKA-8154:
--

[~tmancill] What is involved with getting this fix implemented?  We've been 
running it since last June on releases up to 3.6 without issue.

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on PR #15501:
URL: https://github.com/apache/kafka/pull/15501#issuecomment-2007922919

   Hello @OmniaGM 
   
   Are you be able to split this PR in several to simplify review and merge?
   My jira https://issues.apache.org/jira/browse/KAFKA-14588 blocked by 
migration `KafkaConfig` from `core` so I want to help you with these changes 
somehow. 
   
   If you have no time to split - I can offer my help with this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15950: Serialize broker heartbeat requests [kafka]

2024-03-19 Thread via GitHub


junrao commented on code in PR #14903:
URL: https://github.com/apache/kafka/pull/14903#discussion_r1530918656


##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -254,33 +261,38 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
-val context = new RegistrationTestContext(configProperties)
-val manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+val ctx = new RegistrationTestContext(configProperties)
+val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", isZkBroker = false, 
Set(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
 val controllerNode = new Node(3000, "localhost", 8021)
-context.controllerNodeProvider.node.set(controllerNode)
-manager.start(() => context.highestMetadataOffset.get(),
-  context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
+ctx.controllerNodeProvider.node.set(controllerNode)
+
+manager.start(() => ctx.highestMetadataOffset.get(),
+  ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
   Collections.emptyMap(), OptionalLong.of(10L))
-TestUtils.retry(6) {
-  assertEquals(1, context.mockChannelManager.unsentQueue.size)
-  assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
-}
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1000)), 
controllerNode)
-TestUtils.retry(1) {
-  context.poll()
-  assertEquals(1000L, manager.brokerEpoch)
-}
 
+def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
+def nextRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+def nextRegistrationRequest(epoch: Long) =
+  doPoll[BrokerRegistrationRequest](new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(epoch)))
+
+// Broker registers and response sets epoch to 1000L
+assertEquals(10L, 
nextRegistrationRequest(1000L).data().previousBrokerEpoch())
+
+nextRequest() // poll for next request as way to synchronize with the new 
value into brokerEpoch
+assertEquals(1000L, manager.brokerEpoch)
+
+// Trigger JBOD MV update
 manager.handleKraftJBODMetadataVersionUpdate()
-context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
-  new BrokerRegistrationResponseData().setBrokerEpoch(1200)), 
controllerNode)
-TestUtils.retry(6) {
-  context.time.sleep(100)
-  context.poll()
-  manager.eventQueue.wakeup()
-  assertEquals(1200, manager.brokerEpoch)
-}
+
+// We may have to accept some heartbeats before the new registration is 
sent
+while (nextRequest().isInstanceOf[BrokerHeartbeatRequest])()

Review Comment:
   `prepareResponse` knows the request type. Could we generate the response 
corresponding to the request type there?



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -197,11 +197,17 @@ class BrokerLifecycleManagerTest {
 result
   }
 
-  def poll[T](context: RegistrationTestContext, manager: 
BrokerLifecycleManager, future: Future[T]): T = {
-while (!future.isDone || context.mockClient.hasInFlightRequests) {
-  context.poll()
+  def poll[T](ctx: RegistrationTestContext, manager: BrokerLifecycleManager, 
future: Future[T]): T = {
+while (ctx.mockChannelManager.unsentQueue.isEmpty) {
+  // If the manager is idling until scheduled events we need to advance 
the clock
+  if (manager.eventQueue.scheduledAfterIdling()
+.filter(!_.getClass.getSimpleName.endsWith("TimeoutEvent")) // avoid 
triggering timeout events

Review Comment:
   I see. Perhaps we could add a step after `poll(ctx, manager, registration)` 
to explicitly drain `RegistrationTimeoutEvent` first? This makes the test safer 
since it won't be sensitive to the timeout config values.



##
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##
@@ -219,15 +225,16 @@ class BrokerLifecycleManagerTest {
   Collections.emptyMap(), OptionalLong.empty())
 poll(ctx, manager, registration)
 
+def nextHeartbeatDirs(): Set[String] =
+  poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData(
+.data().offlineLogDirs().asScala.map(_.toString).toSet
+assertEquals(Set.empty, nextHeartbeatDirs())
 
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA"), nextHeartbeatDirs())
 

Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on PR #14471:
URL: https://github.com/apache/kafka/pull/14471#issuecomment-2007792800

   @chia7712 I addressed all your comments. Please, take a look one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530832758


##
checkstyle/import-control.xml:
##
@@ -330,6 +330,7 @@
 
 
 
+

Review Comment:
   This not required.
   
   Sorry for that, should been check this before review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530815007


##
tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+public class CsvUtils {
+private final static CsvMapper MAPPER = new CsvMapper();
+
+static ObjectReader readerFor(Class clazz) {
+return MAPPER.readerFor(clazz).with(getSchema(clazz));
+}
+
+static ObjectWriter writerFor(Class clazz) {
+return MAPPER.writerFor(clazz).with(getSchema(clazz));
+}
+
+private static CsvSchema getSchema(Class clazz) {
+String[] fields;
+if (CsvRecordWithGroup.class == clazz)
+fields = CsvRecordWithGroup.FIELDS;
+else if (CsvRecordNoGroup.class == clazz)
+fields = CsvRecordNoGroup.FIELDS;
+else
+throw new IllegalStateException("Unhandled class " + clazz);
+
+return MAPPER.schemaFor(clazz).sortedBy(fields);
+}
+
+public static class CsvRecordWithGroup {
+public static final String[] FIELDS = new String[] {"group", "topic", 
"partition", "offset"};
+
+@JsonProperty
+private String group;
+
+@JsonProperty
+private String topic;
+
+@JsonProperty
+private int partition;
+
+@JsonProperty
+private long offset;
+
+public CsvRecordWithGroup() {
+}
+
+public CsvRecordWithGroup(String group, String topic, int partition, 
long offset) {
+this.group = group;
+this.topic = topic;
+this.partition = partition;
+this.offset = offset;
+}
+
+public void setGroup(String group) {
+this.group = group;
+}
+
+public String getGroup() {
+return group;
+}
+
+public String getTopic() {
+return topic;
+}
+
+public void setTopic(String topic) {
+this.topic = topic;
+}
+
+public int getPartition() {
+return partition;
+}
+
+public void setPartition(int partition) {
+this.partition = partition;
+}
+
+public long getOffset() {
+return offset;
+}
+
+public void setOffset(long offset) {
+this.offset = offset;
+}
+}
+
+public static class CsvRecordNoGroup {
+public static final String[] FIELDS = new String[]{"topic", 
"partition", "offset"};
+
+@JsonProperty
+private String topic;
+
+@JsonProperty
+private int partition;
+
+@JsonProperty
+private long offset;
+
+public CsvRecordNoGroup() {

Review Comment:
   Done



##
tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectReader;
+import 

Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530813020


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -144,18 +145,18 @@ private void 
testWithConsumerGroup(java.util.function.Consumer withCon
 withConsumerGroup.accept(() -> {
 String topic = inputPartition >= 0 ? inputTopic + ":" + 
inputPartition : inputTopic;
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(getArgs(GROUP, topic));
-scala.Tuple2> res = service.deleteOffsets(GROUP, 
seq(Collections.singletonList(topic)).toList());
-Errors topLevelError = res._1;
-scala.collection.Map partitions = 
res._2;
+Tuple2> res = 
service.deleteOffsets(GROUP, Collections.singletonList(topic));

Review Comment:
   I think usage of Entry is a great idea. 
   Will implement it after this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530807872


##
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##
@@ -0,0 +1,1240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import joptsimple.OptionException;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.ToolsUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConsumerGroupCommand {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommand.class);
+
+static final String MISSING_COLUMN_VALUE = "-";
+
+public static void main(String[] args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+try {
+CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
list all consumer groups, describe a consumer group, delete consumer group 
info, or reset consumer group offsets.");
+
+// should have exactly one action
+long actions = Stream.of(opts.listOpt, opts.describeOpt, 
opts.deleteOpt, opts.resetOffsetsOpt, 
opts.deleteOffsetsOpt).filter(opts.options::has).count();
+if (actions != 1)
+CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one 

[jira] [Created] (KAFKA-16388) add production-ready test of 3.3 - 3.6 release to MetadataVersionTest.testFromVersionString

2024-03-19 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16388:
--

 Summary: add production-ready test of 3.3 - 3.6 release to 
MetadataVersionTest.testFromVersionString
 Key: KAFKA-16388
 URL: https://issues.apache.org/jira/browse/KAFKA-16388
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


https://github.com/apache/kafka/blob/trunk/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java#L169

we have already released 3.3 ~ 3.6, and so they should be included by 
MetadataVersionTest.testFromVersionString

{code:java}
assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4"));
assertEquals(IBP_3_5_IV2, MetadataVersion.fromVersionString("3.5"));
assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6"));
{code} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16153) kraft_upgrade_test system test is broken

2024-03-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828444#comment-17828444
 ] 

Chia-Ping Tsai commented on KAFKA-16153:


IIRC, only production-ready can be acceptable. we don't release 3.8.0 so this 
error should be expected.

> kraft_upgrade_test system test is broken
> 
>
> Key: KAFKA-16153
> URL: https://issues.apache.org/jira/browse/KAFKA-16153
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Mickael Maison
>Priority: Major
>
> I get the following failure from all `from_kafka_version` versions:
> Command '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
> ducker05:9092,ducker06:9092,ducker07:9092 upgrade --metadata 3.8' returned 
> non-zero exit status 1. Remote error message: b'SLF4J: Class path contains 
> multiple SLF4J bindings.\nSLF4J: Found binding in 
> [jar:file:/opt/kafka-dev/tools/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  Found binding in 
> [jar:file:/opt/kafka-dev/trogdor/build/dependant-libs-2.13.12/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.\nSLF4J: Actual binding is of type 
> [org.slf4j.impl.Reload4jLoggerFactory]\nUnsupported metadata version 3.8. 
> Supported metadata versions are 3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 
> 3.5-IV0, 3.5-IV1, 3.5-IV2, 3.6-IV0, 3.6-IV1, 3.6-IV2, 3.7-IV0, 3.7-IV1, 
> 3.7-IV2, 3.7-IV3, 3.7-IV4, 3.8-IV0\n'



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16314: Introducing the AbortableTransactionException [kafka]

2024-03-19 Thread via GitHub


jolshan commented on code in PR #15486:
URL: https://github.com/apache/kafka/pull/15486#discussion_r1530741929


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3040,6 +3048,7 @@ class ReplicaManagerTest {
   transactionalId = transactionalId,
   entriesPerPartition = entriesToAppend,
   responseCallback = appendCallback,
+  apiVersionErrorMapper = genericError

Review Comment:
   That works for now. Long term, we may want another solution, but we can do 
in a follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530718724


##
tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java:
##
@@ -0,0 +1,1240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import joptsimple.OptionException;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AbstractOptions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.ToolsUtils;
+import org.apache.kafka.tools.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConsumerGroupCommand {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommand.class);
+
+static final String MISSING_COLUMN_VALUE = "-";
+
+public static void main(String[] args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+try {
+CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
list all consumer groups, describe a consumer group, delete consumer group 
info, or reset consumer group offsets.");
+
+// should have exactly one action
+long actions = Stream.of(opts.listOpt, opts.describeOpt, 
opts.deleteOpt, opts.resetOffsetsOpt, 
opts.deleteOffsetsOpt).filter(opts.options::has).count();
+if (actions != 1)
+CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one 

Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


nizhikov commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530708691


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -664,7 +664,7 @@ object TestUtils extends Logging {
* until the leader is elected and metadata is propagated to all brokers. If 
it does, the method verifies that it has
* the expected number of partition and replication factor however it does 
not guarantee that the topic is empty.
*/
-  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): 
Unit = {
+  def createOffsetsTopic[B <: KafkaBroker](zkClient: KafkaZkClient, servers: 
Seq[B]): Unit = {

Review Comment:
   There was compilation error earlier.
   Looks like now it's gone.
   Change reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Removed the depreciated information about Zk to Kraft migration. [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on code in PR #15552:
URL: https://github.com/apache/kafka/pull/15552#discussion_r1530699013


##
docs/ops.html:
##
@@ -3797,14 +3797,6 @@ Modifying certain dynamic configurations on the standalone KRaft 
controller
   
 
-  ZooKeeper to KRaft 
Migration

Review Comment:
   There is a link to it (see 
https://github.com/apache/kafka/blob/trunk/docs/ops.html#L3667), so perhaps we 
should keep this anchor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on code in PR #14471:
URL: https://github.com/apache/kafka/pull/14471#discussion_r1530643112


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -664,7 +664,7 @@ object TestUtils extends Logging {
* until the leader is elected and metadata is propagated to all brokers. If 
it does, the method verifies that it has
* the expected number of partition and replication factor however it does 
not guarantee that the topic is empty.
*/
-  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): 
Unit = {
+  def createOffsetsTopic[B <: KafkaBroker](zkClient: KafkaZkClient, servers: 
Seq[B]): Unit = {

Review Comment:
   why we need this change?



##
tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+public class CsvUtils {
+private final static CsvMapper MAPPER = new CsvMapper();
+
+static ObjectReader readerFor(Class clazz) {
+return MAPPER.readerFor(clazz).with(getSchema(clazz));
+}
+
+static ObjectWriter writerFor(Class clazz) {
+return MAPPER.writerFor(clazz).with(getSchema(clazz));
+}
+
+private static CsvSchema getSchema(Class clazz) {
+String[] fields;
+if (CsvRecordWithGroup.class == clazz)
+fields = CsvRecordWithGroup.FIELDS;
+else if (CsvRecordNoGroup.class == clazz)
+fields = CsvRecordNoGroup.FIELDS;
+else
+throw new IllegalStateException("Unhandled class " + clazz);
+
+return MAPPER.schemaFor(clazz).sortedBy(fields);
+}
+
+public static class CsvRecordWithGroup {
+public static final String[] FIELDS = new String[] {"group", "topic", 
"partition", "offset"};
+
+@JsonProperty
+private String group;
+
+@JsonProperty
+private String topic;
+
+@JsonProperty
+private int partition;
+
+@JsonProperty
+private long offset;
+
+public CsvRecordWithGroup() {
+}
+
+public CsvRecordWithGroup(String group, String topic, int partition, 
long offset) {
+this.group = group;
+this.topic = topic;
+this.partition = partition;
+this.offset = offset;
+}
+
+public void setGroup(String group) {
+this.group = group;
+}
+
+public String getGroup() {
+return group;
+}
+
+public String getTopic() {
+return topic;
+}
+
+public void setTopic(String topic) {
+this.topic = topic;
+}
+
+public int getPartition() {
+return partition;
+}
+
+public void setPartition(int partition) {
+this.partition = partition;
+}
+
+public long getOffset() {
+return offset;
+}
+
+public void setOffset(long offset) {
+this.offset = offset;
+}
+}
+
+public static class CsvRecordNoGroup {
+public static final String[] FIELDS = new String[]{"topic", 
"partition", "offset"};
+
+@JsonProperty
+private String topic;
+
+@JsonProperty
+private int partition;
+
+@JsonProperty
+private long offset;
+
+public CsvRecordNoGroup() {

Review Comment:
   Could you add comment to explain that the public constructor is necessary to 
jackson? otherwise, it looks like a useless constructor :)



##
tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to 

Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-19 Thread via GitHub


brandboat commented on code in PR #15512:
URL: https://github.com/apache/kafka/pull/15512#discussion_r1530599572


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##
@@ -1037,7 +1038,7 @@ public void 
testHeartbeatRequestWithFencedInstanceIdException() throws Interrupt
 }
 fail("Expected pollHeartbeat to raise fenced instance id exception 
in 1 second");
 } catch (RuntimeException exception) {
-assertTrue(exception instanceof FencedInstanceIdException);
+assertInstanceOf(FencedInstanceIdException.class, exception);

Review Comment:
   Huge thanks for the detailed view again ! :smiley: 
   
   > I have pushed a commit to rebase code and fix conflict. will merge it if 
there is no connected failure.
   
   You are so kind! Next time I'll try to fix the conflict faster...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16341) Fix un-compressed records

2024-03-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828407#comment-17828407
 ] 

Chia-Ping Tsai commented on KAFKA-16341:


pending for backporting to 3.6

> Fix un-compressed records
> -
>
> Key: KAFKA-16341
> URL: https://issues.apache.org/jira/browse/KAFKA-16341
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16341) Fix un-compressed records

2024-03-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16341:
---
Fix Version/s: 3.8.0
   3.7.1

> Fix un-compressed records
> -
>
> Key: KAFKA-16341
> URL: https://issues.apache.org/jira/browse/KAFKA-16341
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-2007459578

   @johnnychhsu thanks for this contribution! please complete the follow-up:
   
   1. 3.6 branch has many conflicts so please file a PR for branch 3.6
   2. update KIP-734 for the offset of max timestamp
   
   I have backport to 3.7 manually due to less conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-19 Thread via GitHub


chia7712 merged PR #15476:
URL: https://github.com/apache/kafka/pull/15476


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-2007409274

   JDK 11 gets cancel. It should be fine as it gets test in previous QA
   
   I run the failed tests on my local:
   ```sh
   ./gradlew cleanTest :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests 
MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful :storage:test 
--tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
--tests TransactionsWithTieredStoreTest.testAbortTransactionTimeout --tests 
TransactionsWithTieredStoreTest.testCommitTransactionTimeout 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
 --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 :metadata:test --tests QuorumControllerTest.testBootstrapZkMigrationRecord 
--tests QuorumControllerTest.testFenceMultipleBrokers --tests 
QuorumControllerTest.testBalancePartitionLeaders :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests IdentityReplicationIntegrationTest.testReplicate
 SourceDefault --tests 
MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault --tests 
MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs --tests 
MirrorConnectorsIntegrationExactlyOnceTest.testReplication --tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs 
--tests MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault 
--tests MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs :core:test 
--tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaSubscribe
 --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses
 --tests ConsumerBounceTest.testClose --tests 
ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
PlaintextAdminIntegrationTest.testElectPreferredLeaders --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint --tests 
ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests Lo
 gRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments
   ```
   all pass, so I'm about to merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


lianetm commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2007369164

   LGTM, thanks!  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13906) Invalid replica state transition

2024-03-19 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828365#comment-17828365
 ] 

Igor Soarez commented on KAFKA-13906:
-

That is correct [~showuon] 

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Priority: Major
>  Labels: BUG, controller, replication, reproducible-bug
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> 

Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-200748

   @jeffkbkim @jolshan @lianetm Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1530489017


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1904,16 +2000,7 @@ public void testReconciliationProcess() {
 new ConsumerGroupHeartbeatResponseData()
 .setMemberId(memberId1)
 .setMemberEpoch(11)
-.setHeartbeatIntervalMs(5000)

Review Comment:
   Yep. This is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on PR #15533:
URL: https://github.com/apache/kafka/pull/15533#issuecomment-2007332396

   > They are the only three optional fields, right?
   
   There are actually mandatory in full requests.
   
   > Also, my understanding is that there are cases where we don't set 
subscribedTopicNames and rebalanceTimeoutMs but do set ownedTopicPartitions. Do 
you have examples of this case?
   
   rebalanceTimeoutMs will be set once when joining because it never changes 
afterwards. subscribedTopicNames is set when joining too or when it is updated 
on the client. ownedTopicPartitions is set when joining too or when the 
ownership changes on the client. The client "acks" its owned partitions. 
Overall, in non joining request, they are all set independently.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16367; Full ConsumerGroupHeartbeat response must be sent when full request is received [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15533:
URL: https://github.com/apache/kafka/pull/15533#discussion_r1530481757


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1190,10 +1190,11 @@ private 
CoordinatorResult consumerGr
 .setHeartbeatIntervalMs(consumerGroupHeartbeatIntervalMs);
 
 // The assignment is only provided in the following cases:
-// 1. The member reported its owned partitions;
-// 2. The member just joined or rejoined to group (epoch equals to 
zero);
-// 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
+// 1. The member sent a full request. It does so when joining or 
rejoining the group; or
+//on any errors (e.g. timeout).
+// 2. The member's assignment has been updated.
+boolean isFullRequest = memberEpoch == 0 || (rebalanceTimeoutMs != -1 
&& subscribedTopicNames != null && ownedTopicPartitions != null);

Review Comment:
   @jolshan Yes. You're understanding is correct. I updated the comment to 
explain it as it was explained in the description.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1650,6 +1650,102 @@ public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen
 .setTopicPartitions(Collections.emptyList(;
 }
 
+@Test
+public void testConsumerGroupHeartbeatFullResponse() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+// Create a context with one consumer group containing two members.

Review Comment:
   Updated the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dajac commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530443218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -619,6 +622,22 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
 }
 }
 
+/**
+ * Gets a consumer group.
+ *
+ * @param groupId   The group id.
+ *
+ * @return A ConsumerGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or the 
group is not a consumer group.
+ *
+ * Package private for testing.
+ */
+ConsumerGroup getConsumerGroup(

Review Comment:
   I just noticed that we also have `consumerGroup(String, long)` as a method. 
I wonder whether we should also name this one `consumerGroup` too and call the 
one that I mentioned. It would also make sense to regroup them. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -598,15 +600,16 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+List records
 ) throws GroupIdNotFoundException {
 Group group = groups.get(groupId);
 
 if (group == null && !createIfNotExists) {
 throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
 }
 
-if (group == null) {
+if (group == null || maybeDeleteEmptyClassicGroup(group, records)) {

Review Comment:
   Oh thanks for the catch. Yeah it's necessary when the group is an empty 
classic group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16313: Offline group protocol migration (reopened) [kafka]

2024-03-19 Thread via GitHub


dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1530392822


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -598,15 +600,16 @@ public List 
describeGroups(
  */
 ConsumerGroup getOrMaybeCreateConsumerGroup(
 String groupId,
-boolean createIfNotExists
+boolean createIfNotExists,
+List records
 ) throws GroupIdNotFoundException {
 Group group = groups.get(groupId);
 
 if (group == null && !createIfNotExists) {
 throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
 }
 
-if (group == null) {
+if (group == null || maybeDeleteEmptyClassicGroup(group, records)) {

Review Comment:
   Oh thanks for the catch. Yeah it's necessary when the group is a empty 
classic group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-19 Thread via GitHub


wernerdv commented on PR #15558:
URL: https://github.com/apache/kafka/pull/15558#issuecomment-2007211486

   Hello, @mimaison
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16387) Allow kafka-metadata-shell to read a running server metadata

2024-03-19 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-16387:
-

 Summary: Allow kafka-metadata-shell to read a running server 
metadata
 Key: KAFKA-16387
 URL: https://issues.apache.org/jira/browse/KAFKA-16387
 Project: Kafka
  Issue Type: Improvement
Reporter: PoAn Yang
Assignee: PoAn Yang


Currently, kafka-metadata-shell tries to get the file lock before reading the 
data, so it can't read running server metadata.


If users don't want to read the latest data, kafka-metadata-shell can provide 
an option to copy the data to another place and only read the copied data. In 
this case, kafka-metadata-shell can work without shutting down the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16385:
--
Description: 
Steps to reproduce:
0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up the 
test.
1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
retention.ms=1sec .
2. Send a record "aaa" to the topic
3. Wait for 1 second

Will this segment will rolled? I thought no.
But what I have tested is it will roll:
{code:java}
[2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
(kafka.log.LocalLog)
[2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
producer snapshot at offset 1 with 1 producer ids in 1 ms. 
(org.apache.kafka.storage.internals.log.ProducerStateManager)
[2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
log retention time 1000ms breach based on the largest record timestamp in the 
segment (kafka.log.UnifiedLog)
{code}
The segment is rolled due to log retention time 1000ms breached, which is 
unexpected.

Tested in v3.5.1, it has the same issue.

  was:
Steps to reproduce:
0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up the 
test.
1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
2. Send a record "aaa" to the topic
3. Wait for 1 second

Will this segment will rolled? I thought no.
But what I have tested is it will roll:

{code:java}
[2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
(kafka.log.LocalLog)
[2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
producer snapshot at offset 1 with 1 producer ids in 1 ms. 
(org.apache.kafka.storage.internals.log.ProducerStateManager)
[2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
log retention time 1000ms breach based on the largest record timestamp in the 
segment (kafka.log.UnifiedLog)
{code}

The segment is rolled due to log retention time 1000ms breached, which is 
unexpected.

Tested in v3.5.1, it has the same issue.


> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-03-19 Thread via GitHub


squah-confluent opened a new pull request, #15559:
URL: https://github.com/apache/kafka/pull/15559

   KIP-890 Part 1 introduced verification of transactions with the transaction 
coordinator on the `Produce` and `TxnOffsetCommit` paths. This introduced the 
possibility of new errors when responding to those requests. For backwards 
compatibility with older clients, a choice was made to convert some of the new 
retriable errors to existing errors that are expected and retried correctly by 
older clients.
   
   `NETWORK_EXCEPTION` was forgotten about and not converted, but can occur if, 
for example, the transaction coordinator is temporarily refusing connections. 
Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other 
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This error 
does not force coordinator lookup on clients, unlike 
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890, which says 
that retriable errors should be converted to `COORDINATOR_NOT_AVAILABLE`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated

2024-03-19 Thread Sean Quah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Quah updated KAFKA-16386:
--
Description: 
KAFKA-14402 
([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
 adds verification with the transaction coordinator on Produce and 
TxnOffsetCommit paths as a defense against hanging transactions. For 
compatibility with older clients, retriable errors from the verification step 
are translated to ones already expected and handled by existing clients. When 
verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.

[~dajac] noticed this manifesting as a test failure when 
tests/kafkatest/tests/core/transactions_test.py was run with an older client 
(prior to the fix for KAFKA-16122):
{quote}
{{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
{{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error so 
it transitions to the fatal state.
It seems that there are two cases where the server could return it: (1) When 
the verification request times out or its connections is cut; or (2) in 
{{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because we 
want a retriable error.
{quote}

The first case was triggered as part of the test. The second case happens when 
there is already a verification request ({{AddPartitionsToTxn}}) in flight with 
the same epoch and we want clients to try again when we're not busy.

  was:
KAFKA-14402 
([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
 adds verification with the transaction coordinator on Produce and 
TxnOffsetCommit paths as a defense against hanging transactions. For 
compatibility with older clients, retriable errors from the verification step 
are translated to ones already expected and handled by existing clients. When 
verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.

[~dajac] noticed this manifesting as a test failure when 
tests/kafkatest/tests/core/transactions_test.py was run with an older client 
(pre KAFKA-16122):
{quote}
{{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
{{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error so 
it transitions to the fatal state.
It seems that there are two cases where the server could return it: (1) When 
the verification request times out or its connections is cut; or (2) in 
{{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because we 
want a retriable error.
{quote}

The first case was triggered as part of the test. The second case happens when 
there is already a verification request ({{AddPartitionsToTxn}}) in flight with 
the same epoch and we want clients to try again when we're not busy.


> NETWORK_EXCEPTIONs from transaction verification are not translated
> ---
>
> Key: KAFKA-16386
> URL: https://issues.apache.org/jira/browse/KAFKA-16386
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Sean Quah
>Priority: Minor
>
> KAFKA-14402 
> ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
>  adds verification with the transaction coordinator on Produce and 
> TxnOffsetCommit paths as a defense against hanging transactions. For 
> compatibility with older clients, retriable errors from the verification step 
> are translated to ones already expected and handled by existing clients. When 
> verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.
> [~dajac] noticed this manifesting as a test failure when 
> tests/kafkatest/tests/core/transactions_test.py was run with an older client 
> (prior to the fix for KAFKA-16122):
> {quote}
> {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
> {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error 
> so it transitions to the fatal state.
> It seems that there are two cases where the server could return it: (1) When 
> the verification request times out or its connections is cut; or (2) in 
> {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because 
> we want a retriable error.
> {quote}
> The first case was triggered as part of the test. The second case happens 
> when there is already a verification request ({{AddPartitionsToTxn}}) in 
> flight with the same epoch and we want clients to try again when we're not 
> busy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828319#comment-17828319
 ] 

Luke Chen edited comment on KAFKA-16385 at 3/19/24 12:58 PM:
-

[~divijvaidya], I was thinking the use case you mentioned:

_I have set max segment size to be 1 GB and I have a topic with low ingress 
traffic. I want to expire data in my log every 1 day due to compliance 
requirement. But the partition doesn't receive 1GB of data in one day and 
hence, my active segment will never become eligible for expiration._

OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
expire the 1 day limit will get deleted. Let's say, Right before the retention 
thread starting to check, a new record arrived. Then, in this case, this 
segment won't be eligible for expiration even though it contains data over 1 
day. And it still breaks the contract of the retention.ms.

Again, I don't know which is the expected behavior we want. So I'd like to hear 
more comments from the community/experts.


was (Author: showuon):
[~divijvaidya], I was thinking the use case you mentioned:

_I have set max segment size to be 1 GB and I have a topic with low ingress 
traffic. I want to expire data in my log every 1 day due to compliance 
requirement. But the partition doesn't receive 1GB of data in one day and 
hence, my active segment will never become eligible for expiration._

OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
expire the 1 day limit will get deleted. Let's say, Right before the retention 
thread starting to check, a new record arrived. Then, in this case, this 
segment won't be eligible for expiration even though it contains data over 1 
day. And it breaks the contract of the retention.ms.

Again, I don't know which is the expected behavior we want. So I'd like to hear 
more comments from the community/experts.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828319#comment-17828319
 ] 

Luke Chen edited comment on KAFKA-16385 at 3/19/24 12:57 PM:
-

[~divijvaidya], I was thinking the use case you mentioned:

_I have set max segment size to be 1 GB and I have a topic with low ingress 
traffic. I want to expire data in my log every 1 day due to compliance 
requirement. But the partition doesn't receive 1GB of data in one day and 
hence, my active segment will never become eligible for expiration._

OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
expire the 1 day limit will get deleted. Let's say, Right before the retention 
thread starting to check, a new record arrived. Then, in this case, this 
segment won't be eligible for expiration even though it contains data over 1 
day. And it breaks the contract of the retention.ms.

Again, I don't know which is the expected behavior we want. So I'd like to hear 
more comments from the community/experts.


was (Author: showuon):
[~divijvaidya], I was thinking the use case you mentioned:

_I have set max segment size to be 1 GB and I have a topic with low ingress 
traffic. I want to expire data in my log every 1 day due to compliance 
requirement. But the partition doesn't receive 1GB of data in one day and 
hence, my active segment will never become eligible for expiration. _

OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
expire the 1 day limit will get deleted. Let's say, when right before the 
retention thread starting to check, a new record arrived. In this case, this 
segment won't be eligible for expiration even though it contains data over 1 
day. And it breaks the contract of the retention.ms.

Again, I don't know which is the expected behavior we want. So I'd like to hear 
more comments from the community/experts. 

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828319#comment-17828319
 ] 

Luke Chen commented on KAFKA-16385:
---

[~divijvaidya], I was thinking the use case you mentioned:

_I have set max segment size to be 1 GB and I have a topic with low ingress 
traffic. I want to expire data in my log every 1 day due to compliance 
requirement. But the partition doesn't receive 1GB of data in one day and 
hence, my active segment will never become eligible for expiration. _

OK, so, even if we adopt the option 2, we still cannot guarantee all the data 
expire the 1 day limit will get deleted. Let's say, when right before the 
retention thread starting to check, a new record arrived. In this case, this 
segment won't be eligible for expiration even though it contains data over 1 
day. And it breaks the contract of the retention.ms.

Again, I don't know which is the expected behavior we want. So I'd like to hear 
more comments from the community/experts. 

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated

2024-03-19 Thread Sean Quah (Jira)
Sean Quah created KAFKA-16386:
-

 Summary: NETWORK_EXCEPTIONs from transaction verification are not 
translated
 Key: KAFKA-16386
 URL: https://issues.apache.org/jira/browse/KAFKA-16386
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.0
Reporter: Sean Quah


KAFKA-14402 
([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
 adds verification with the transaction coordinator on Produce and 
TxnOffsetCommit paths as a defense against hanging transactions. For 
compatibility with older clients, retriable errors from the verification step 
are translated to ones already expected and handled by existing clients. When 
verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.

[~dajac] noticed this manifesting as a test failure when 
tests/kafkatest/tests/core/transactions_test.py was run with an older client 
(pre KAFKA-16122):
{quote}
{{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
{{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error so 
it transitions to the fatal state.
It seems that there are two cases where the server could return it: (1) When 
the verification request times out or its connections is cut; or (2) in 
{{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because we 
want a retriable error.
{quote}

The first case was triggered as part of the test. The second case happens when 
there is already a verification request ({{AddPartitionsToTxn}}) in flight with 
the same epoch and we want clients to try again when we're not busy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Revert to Gradle 8.5 [DO NOT MERGE YET] [kafka]

2024-03-19 Thread via GitHub


pasharik commented on PR #15553:
URL: https://github.com/apache/kafka/pull/15553#issuecomment-2007063040

   Hi All,
   
   Update on my original comment: issue with incremental compilation seems to 
happen only when changes made from IntelliJ.
   
   Would anybody mind trying to reproduce it on your local environment? Steps 
to reproduce:
   
   * Open `kafka.admin.AclCommandTest` in IntelliJ Idea. I'm using `2023.3.5 
Ultimate`
   * Add `println("a")` at the beginning of `testAclCliWithAuthorizer` method
   * Run in terminal: `./gradlew core:test --tests 
"kafka.admin.AclCommandTest.testAclCliWithAuthorizer"`
   * Result: all tests in `core` module are re-compiled
   
   If test modified with `vim`, only single test class is re-compiled


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update dependencies [kafka]

2024-03-19 Thread via GitHub


jlprat commented on PR #15404:
URL: https://github.com/apache/kafka/pull/15404#issuecomment-2007053721

   I generated the wrapper the way `gradle` recommends (`./gradlew wrapper 
--gradle-version latest`). I would leave it the way gradle generates it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update dependencies [kafka]

2024-03-19 Thread via GitHub


pasharik commented on PR #15404:
URL: https://github.com/apache/kafka/pull/15404#issuecomment-2007048822

   There is a version of gradle wrapper in our 
[gradlew](https://github.com/apache/kafka/blob/trunk/gradlew#L119) file:
   
   ```
   
https://raw.githubusercontent.com/gradle/gradle/v8.5.0/gradle/wrapper/gradle-wrapper.jar
   ```
   
   should we update to `8.6.0` it as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update dependencies [kafka]

2024-03-19 Thread via GitHub


pasharik commented on code in PR #15404:
URL: https://github.com/apache/kafka/pull/15404#discussion_r1530262197


##
gradle/dependencies.gradle:
##
@@ -100,10 +100,10 @@ versions += [
   commonsCli: "1.4",
   commonsValidator: "1.7",
   dropwizardMetrics: "4.1.12.1",
-  gradle: "8.5",
+  gradle: "8.6",

Review Comment:
   @jlprat my `trunk` branch was almost 2 weeks old, now I've updated it and I 
can't see any difference between Gradle 8.5 and 8.6. So I think we can leave 
this PR as it is, sorry for the confusion.
   
   I didn't report issue to the Gradle yet, still trying to figure out exact 
steps how to reproduce it, and if it's related to Gradle or to my local 
environment setup.
   
   Not related to this particular PR, but I still can see issues with 
incremental compilation, only when making changes from IntelliJ Idea. If 
changes are made from vim or other console text editor, incremental compilation 
works as expected.
   
   Steps to reproduce:
   
   * Open `kafka.admin.AclCommandTest` in IntelliJ Idea. I'm using `2023.3.5 
Ultimate`
   * Add `println("a")` at the beginning of `testAclCliWithAuthorizer` method
   * Run in terminal: `./gradlew core:test --tests 
"kafka.admin.AclCommandTest.testAclCliWithAuthorizer"`
   * Result: all tests in `core` module are re-compiled
   
   If test modified with `vim`, only single test class is re-compiled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric

2024-03-19 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828304#comment-17828304
 ] 

Johnny Hsu commented on KAFKA-16323:


I have added logs to observe and tried to verify some potential causes:
 # check whether it really enters the delayed remote fetch
 # check whether it really enters the onExpire() section
 # check whether it succeeds in marking the metrics

All verified and works as expected. 

 

Thanks to [~showuon], I also tried to add @BeforeAll to the tear down function, 
which remove all metrics before the test. However it still failed. 

Need some more tries to find the root causes...

> Failing test: fix testRemoteFetchExpiresPerSecMetric 
> -
>
> Key: KAFKA-16323
> URL: https://issues.apache.org/jira/browse/KAFKA-16323
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: test-failure
>
> Refer to 
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/]
> This test is failing, and this ticket aims to address this 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13906) Invalid replica state transition

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828305#comment-17828305
 ] 

Luke Chen commented on KAFKA-13906:
---

Also, we should also verify if this happen in KRaft mode.

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Priority: Major
>  Labels: BUG, controller, replication, reproducible-bug
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> 

[jira] [Commented] (KAFKA-13906) Invalid replica state transition

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828303#comment-17828303
 ] 

Luke Chen commented on KAFKA-13906:
---

We ran into similar situation recently. [~soarez], I'd like to confirm my 
understanding is correct. It looks like the issue has no impact to the topic 
because the topic is already deleted. The only issue is that the error log is 
confusing users. Is my understanding correct?

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Priority: Major
>  Labels: BUG, controller, replication, reproducible-bug
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> 

[jira] [Updated] (KAFKA-16380) Rename the shallowOffsetOfMaxTimestamp in LogSegment

2024-03-19 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16380:
---
Description: 
When working on #KAFKA-16341, we found `shallowOffsetOfMaxTimestamp` also 
appears in LogSegment, which is a confusing name since it actually represent 
the record level offset, instead of a record batch level offset. 

Thus, this variable name should be renamed as well. 

More details can be found in 
[https://github.com/apache/kafka/pull/15476.|https://github.com/apache/kafka/pull/15476]

  was:
When working on #KAFKA-16341, we found `shallowOffsetOfMaxTimestamp` also 
appears in LogSegment, which is a confusing name since it actually represent 
the record level offset, instead of a record batch level offset. 

Thus, this variable name should be renamed as well. 

More details can be found in 


> Rename the shallowOffsetOfMaxTimestamp in LogSegment
> 
>
> Key: KAFKA-16380
> URL: https://issues.apache.org/jira/browse/KAFKA-16380
> Project: Kafka
>  Issue Type: Bug
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Minor
>
> When working on #KAFKA-16341, we found `shallowOffsetOfMaxTimestamp` also 
> appears in LogSegment, which is a confusing name since it actually represent 
> the record level offset, instead of a record batch level offset. 
> Thus, this variable name should be renamed as well. 
> More details can be found in 
> [https://github.com/apache/kafka/pull/15476.|https://github.com/apache/kafka/pull/15476]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16380) Rename the shallowOffsetOfMaxTimestamp in LogSegment

2024-03-19 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16380:
---
Description: 
When working on #KAFKA-16341, we found `shallowOffsetOfMaxTimestamp` also 
appears in LogSegment, which is a confusing name since it actually represent 
the record level offset, instead of a record batch level offset. 

Thus, this variable name should be renamed as well. 

More details can be found in 

  was:When working on #KAFKA-16341, we found 


> Rename the shallowOffsetOfMaxTimestamp in LogSegment
> 
>
> Key: KAFKA-16380
> URL: https://issues.apache.org/jira/browse/KAFKA-16380
> Project: Kafka
>  Issue Type: Bug
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Minor
>
> When working on #KAFKA-16341, we found `shallowOffsetOfMaxTimestamp` also 
> appears in LogSegment, which is a confusing name since it actually represent 
> the record level offset, instead of a record batch level offset. 
> Thus, this variable name should be renamed as well. 
> More details can be found in 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16380) Rename the shallowOffsetOfMaxTimestamp in LogSegment

2024-03-19 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu updated KAFKA-16380:
---
Description: When working on #KAFKA-16341, we found 

> Rename the shallowOffsetOfMaxTimestamp in LogSegment
> 
>
> Key: KAFKA-16380
> URL: https://issues.apache.org/jira/browse/KAFKA-16380
> Project: Kafka
>  Issue Type: Bug
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Minor
>
> When working on #KAFKA-16341, we found 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16380) Rename the shallowOffsetOfMaxTimestamp in LogSegment

2024-03-19 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828302#comment-17828302
 ] 

Johnny Hsu commented on KAFKA-16380:


[~chia7712] sure, let me update that.

BTW this is already addressed in 
[https://github.com/apache/kafka/pull/15476.|https://github.com/apache/kafka/pull/15476]

Will link it here. 

> Rename the shallowOffsetOfMaxTimestamp in LogSegment
> 
>
> Key: KAFKA-16380
> URL: https://issues.apache.org/jira/browse/KAFKA-16380
> Project: Kafka
>  Issue Type: Bug
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on code in PR #15512:
URL: https://github.com/apache/kafka/pull/15512#discussion_r1530236303


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java:
##
@@ -177,7 +177,7 @@ private void assertWakeupExceptionIsThrown(final 
CompletableFuture future) {
 try {
 future.get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   ditto



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2172,7 +2173,7 @@ public void testCreatePartitions() throws Exception {
 otherTopicResult.get();
 fail("get() should throw ExecutionException");

Review Comment:
   ditto



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##
@@ -1037,7 +1038,7 @@ public void 
testHeartbeatRequestWithFencedInstanceIdException() throws Interrupt
 }
 fail("Expected pollHeartbeat to raise fenced instance id exception 
in 1 second");
 } catch (RuntimeException exception) {
-assertTrue(exception instanceof FencedInstanceIdException);
+assertInstanceOf(FencedInstanceIdException.class, exception);

Review Comment:
   this should be rewrite by `assertThrow`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-19 Thread via GitHub


johnnychhsu commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1530243082


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -78,6 +111,10 @@ public Measurable measurable() {
 }
 }
 
+/**
+ * Set the metric config.

Review Comment:
   i see, thanks for the explanation. 
   Just updated it, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16381: add lock for KafkaMetric config getter [kafka]

2024-03-19 Thread via GitHub


johnnychhsu commented on PR #15550:
URL: https://github.com/apache/kafka/pull/15550#issuecomment-2007003985

   Thanks for the review @vamossagar12 .
   let me rephrase your comment to make sure that I understand you correctly.
   > Now, if some other thread is trying to update the config by using 
config(MetricConfig config), then unless that operation 
   > doesn't go through, we can keep returning the unchanged config object.
   So you mean only if the `config(MetricConfig config)` works, then we need to 
return the updated config, right?
   
   I feel it's better to have it here, because if someone update the config, 
then immediately use the config as a criteria to do something else, but the 
config has not updated yet or somehow fails, then this could lead to a 
potential problem. 
   wdyt?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828296#comment-17828296
 ] 

Luke Chen commented on KAFKA-16385:
---

Thanks Chia-Ping.

I admin my original understanding of `retention.ms` is it only take affects to 
the inactive segments. So, in my example of `segment.ms=7days`, 
`retention.ms=1sec`, my expectaion is the segment will be rolled after 7 days 
or size > segment.bytes, and then, the segment will be eligible for deletion. 
But from Divij's explanation, I agree the definition of `retention.ms` is more 
like "the oldest record allowed to appear in the log". If that's the case, then 
I could be wrong and we should improve the doc. Otherwise, this should be a bug 
that the active (idle) segment should not be rolled even though the 
maxTimestamp is expired (retention.ms). 

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1530225439


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -78,6 +109,11 @@ public Measurable measurable() {
 }
 }
 
+/**
+ * Set the metric config.
+ * This is supposed to be used by server only, DO NOT use this for your 
metrics.

Review Comment:
   `This is supposed to be used by server only` this description is good enough 
to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16318 : add javafoc for kafka metric [kafka]

2024-03-19 Thread via GitHub


chia7712 commented on code in PR #15483:
URL: https://github.com/apache/kafka/pull/15483#discussion_r1530224467


##
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java:
##
@@ -78,6 +111,10 @@ public Measurable measurable() {
 }
 }
 
+/**
+ * Set the metric config.

Review Comment:
   > However, why are users not allowed to fetch the config?
   
   `MetricConfig` has the methods used by server too, so encouraging user to 
access `MetricConfig` could lead potential trouble I feel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828281#comment-17828281
 ] 

Chia-Ping Tsai commented on KAFKA-16385:


It seems to me the scenario is about " should we roll the idle segment even 
though it is the active one"?

the expire-based cleanup normally skips the active segment since the max 
timestamp of active segment keep updating. However, if we stop pushing data to 
active segment, it has chance to be viewed as expired segment and be cleaned-up 
by the "kafka-log-retention" thread. I feel that is option 2 mentioned by 
[~divijvaidya]

Also, there is similar scenario if we define `retention.bytes to zero.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >