Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. The similar logic is used in fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: Thanks for pointing out. This may be even not required as I already mocked fetchRemoteLogSegmentMetadata function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435446137 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1412,8 +1422,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset, } } } - -private Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, +// visible for testing. +Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, Option leaderEpochFileCacheOption) throws RemoteStorageException { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435445309 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: @clolov We need to use a while loop as there may be a possibility the next segment in the iteration is log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log. This is the same logic used for fetching data from the log segment. https://github.com/apache/kafka/blob/317f61dfd9a7f1443cf75b6a32568d1c81984d08/core/src/main/scala/kafka/log/LocalLog.scala#L425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435444785 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: We are calling it two times in the Unit Test.So we need two times this value to be returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799209#comment-17799209 ] Alexander Aghili edited comment on KAFKA-16027 at 12/23/23 2:09 AM: Made a pull request for refactoring. Didn't move it to ConsumerMetadata.java due to some additional complications. Discussions can take place on the PR. was (Author: JIRAUSER303109): [https://github.com/apache/kafka/pull/15055] Made a pull request for refactoring. Didn't move it to ConsumerMetadata.java due to some additional complications. Discussions can take place on the PR. > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Close RemoteLogManager in RemoteLogManagerTest [kafka]
satishd commented on PR #15063: URL: https://github.com/apache/kafka/pull/15063#issuecomment-1868160079 Nice catch @dajac, thanks for resolving it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16012) Incomplete range assignment in consumer
[ https://issues.apache.org/jira/browse/KAFKA-16012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16012. Resolution: Fixed > Incomplete range assignment in consumer > --- > > Key: KAFKA-16012 > URL: https://issues.apache.org/jira/browse/KAFKA-16012 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Philip Nee >Priority: Blocker > Fix For: 3.7.0 > > > We were looking into test failures here: > https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1702475525--jolshan--kafka-15784--7cad567675/2023-12-13--001./2023-12-13–001./report.html. > > Here is the first failure in the report: > {code:java} > > test_id: > kafkatest.tests.core.group_mode_transactions_test.GroupModeTransactionsTest.test_transactions.failure_mode=clean_bounce.bounce_target=brokers > status: FAIL > run time: 3 minutes 4.950 seconds > TimeoutError('Consumer consumed only 88223 out of 10 messages in > 90s') {code} > > We traced the failure to an apparent bug during the last rebalance before the > group became empty. The last remaining instance seems to receive an > incomplete assignment which prevents it from completing expected consumption > on some partitions. Here is the rebalance from the coordinator's perspective: > {code:java} > server.log.2023-12-13-04:[2023-12-13 04:58:56,987] INFO [GroupCoordinator 3]: > Stabilized group grouped-transactions-test-consumer-group generation 5 > (__consumer_offsets-2) with 1 members > (kafka.coordinator.group.GroupCoordinator) > server.log.2023-12-13-04:[2023-12-13 04:58:56,990] INFO [GroupCoordinator 3]: > Assignment received from leader > consumer-grouped-transactions-test-consumer-group-1-2164f472-93f3-4176-af3f-23d4ed8b37fd > for group grouped-transactions-test-consumer-group for generation 5. The > group has 1 members, 0 of which are static. > (kafka.coordinator.group.GroupCoordinator) {code} > The group is down to one member in generation 5. In the previous generation, > the consumer in question reported this assignment: > {code:java} > // Gen 4: we've got partitions 0-4 > [2023-12-13 04:58:52,631] DEBUG [Consumer > clientId=consumer-grouped-transactions-test-consumer-group-1, > groupId=grouped-transactions-test-consumer-group] Executing onJoinComplete > with generation 4 and memberId > consumer-grouped-transactions-test-consumer-group-1-2164f472-93f3-4176-af3f-23d4ed8b37fd > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [2023-12-13 04:58:52,631] INFO [Consumer > clientId=consumer-grouped-transactions-test-consumer-group-1, > groupId=grouped-transactions-test-consumer-group] Notifying assignor about > the new Assignment(partitions=[input-topic-0, input-topic-1, input-topic-2, > input-topic-3, input-topic-4]) > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code} > However, in generation 5, we seem to be assigned only one partition: > {code:java} > // Gen 5: Now we have only partition 1? But aren't we the last member in the > group? > [2023-12-13 04:58:56,954] DEBUG [Consumer > clientId=consumer-grouped-transactions-test-consumer-group-1, > groupId=grouped-transactions-test-consumer-group] Executing onJoinComplete > with generation 5 and memberId > consumer-grouped-transactions-test-consumer-group-1-2164f472-93f3-4176-af3f-23d4ed8b37fd > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [2023-12-13 04:58:56,955] INFO [Consumer > clientId=consumer-grouped-transactions-test-consumer-group-1, > groupId=grouped-transactions-test-consumer-group] Notifying assignor about > the new Assignment(partitions=[input-topic-1]) > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code} > The assignment type is range from the JoinGroup for generation 5. The decoded > metadata sent by the consumer is this: > {code:java} > Subscription(topics=[input-topic], ownedPartitions=[], groupInstanceId=null, > generationId=4, rackId=null) {code} > Here is the decoded assignment from the SyncGroup: > {code:java} > Assignment(partitions=[input-topic-1]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan merged PR #15023: URL: https://github.com/apache/kafka/pull/15023 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1868141386 Ok -- looks good. I will merge and backport to 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Angelos Kaltsikis updated KAFKA-16047: -- Description: Source Connectors with 'exactly.once.support = required' may have some of their tasks that issue InitProducerId requests from the admin client timeout. In the case of MirrorSourceConnector, which was the source connector that i found the bug, the bug was effectively making all the tasks (in the specific case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many restarts i did to the connector/tasks, i couldn't get the MirrorSourceConnector in a healthy RUNNING state again. Due to the low timeout that has been [hard-coded in the code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] (1ms), there is a chance that the `InitProducerId` requests timeout in case of "slower-than-expected" Kafka brokers (that do not process & respond to the above request in <= 1ms). (feel free to read more information about the issue in the "More Context" section below) [~ChrisEgerton] I would appreciate it if you could respond to the following questions - How and why was the 1ms magic number for transaction timeout has to be chosen? - Is there any specific reason that it can be guaranteed that the `InitProducerId` request can be processed in such a small time window? - I have tried the above in multiple different Kafka clusters that are hosted in different underlying datacenter hosts and i don't believe that those brokers are "slow" for some reason. If you feel that the brokers are slower than expected, i would appreciate any pointers on how could i find out what is the bottleneck h3. Temporary Mitigation I have increased the timeout to 1000ms (randomly picked this number, just wanted to give enough time to brokers to always complete those type of requests). It fix can be found in my fork https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f h3. Final solution The temporary mitigation is not ideal, as it still randomly picks a timeout for such an operation which may high enough but it's not ensured that it will always be high enough. Shall we introduce something client configurable ? At the same time, i was thinking whether it makes sense to introduce some tests that simulate slower than the "blazing" fast mocked brokers that exist in Unit Tests, so as to be able to catch this type of low timeouts that potentially make some software features not usable. h3. What is affected The above bug exists in MirrorSourceConnector Tasks running in distributed Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode enabled (pre-requisite for the exactly.once.support to work). I believe this should be true for other SourceConnectors as well (as the code-path that was the one to blame is Connect specific & not MirrorMaker specific). h3. More context & logs *Connector Logs* {code:java} Caused by: java.util.concurrent.CompletionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) {code} *Broker Logs* {code:java} [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning COORDINATOR_NOT_AVAILABLE error code to client for kafka-connect-uat-mm2-msc-20th-7's InitProducerId request (kafka.coordinator.transaction.TransactionCoordinator) [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager) {code} h3. How to reproduce it While the bug exists in both the Standalone MM2 deployment, it's easier to reproduce it via deploying the connector to a Kafka Connect cluster (as it is possible to update the config/delete/restart/pause/stop/resume via the Kafka Connect REST API) Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with `exactly.once.source.support = enabled`) and after the initial start, update it's configuration or restart the connector & tasks. To test whether my fork has fixed the issue once and for good i have created the following script, which constantly restarts the connector every few seconds (after it's tasks get in RUNNING state). I have been running the scripts for a few hours and the Mirr
[jira] [Updated] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Angelos Kaltsikis updated KAFKA-16047: -- Summary: Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector (was: Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing the whole connector) > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests timeout. In the case of > MirrorSourceConnector, which was the source connector that i found the bug, > the bug was effectively making all the tasks (in the specific case of) become > "FAILED". As soon as one of the tasks get in the FAILED state due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUp
[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799953#comment-17799953 ] Angelos Kaltsikis commented on KAFKA-16047: --- cc. [~gregharris73] > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests timeout. In the case of > MirrorSourceConnector, which was the source connector that i found the bug, > the bug was effectively making all the tasks (in the specific case of) become > "FAILED". As soon as one of the tasks get in the FAILED state due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), > aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the > callback (kafka.coordinator.t
[jira] [Updated] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Angelos Kaltsikis updated KAFKA-16047: -- Summary: Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing the whole connector (was: Source connector with EOS enabled have some InitProducerId requests timing out) > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests timeout. In the case of > MirrorSourceConnector, which was the source connector that i found the bug, > the bug was effectively making all the tasks (in the specific case of) become > "FAILED". As soon as one of the tasks get in the FAILED state due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resett
[jira] [Created] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out
Angelos Kaltsikis created KAFKA-16047: - Summary: Source connector with EOS enabled have some InitProducerId requests timing out Key: KAFKA-16047 URL: https://issues.apache.org/jira/browse/KAFKA-16047 Project: Kafka Issue Type: Bug Components: connect, mirrormaker Affects Versions: 3.6.1, 3.5.2, 3.5.1, 3.6.0, 3.4.1, 3.3.2, 3.3.1, 3.4.0, 3.3.0 Reporter: Angelos Kaltsikis Source Connectors with 'exactly.once.support = required' may have some of their tasks that issue InitProducerId requests timeout. In the case of MirrorSourceConnector, which was the source connector that i found the bug, the bug was effectively making all the tasks (in the specific case of) become "FAILED". As soon as one of the tasks get in the FAILED state due to the 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many restarts i did to the connector/tasks, i couldn't get the MirrorSourceConnector in a healthy RUNNING state again. Due to the low timeout that has been [hard-coded in the code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] (1ms), there is a chance that the `InitProducerId` requests timeout in case of "slower-than-expected" Kafka brokers (that do not process & respond to the above request in <= 1ms). (feel free to read more information about the issue in the "More Context" section below) [~ChrisEgerton] I would appreciate it if you could respond to the following questions - How and why was the 1ms magic number for transaction timeout has to be chosen? - Is there any specific reason that it can be guaranteed that the `InitProducerId` request can be processed in such a small time window? - I have tried the above in multiple different Kafka clusters that are hosted in different underlying datacenter hosts and i don't believe that those brokers are "slow" for some reason. If you feel that the brokers are slower than expected, i would appreciate any pointers on how could i find out what is the bottleneck h3. Temporary Mitigation I have increased the timeout to 1000ms (randomly picked this number, just wanted to give enough time to brokers to always complete those type of requests). It fix can be found in my fork https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f h3. Final solution The temporary mitigation is not ideal, as it still randomly picks a timeout for such an operation which may high enough but it's not ensured that it will always be high enough. Shall we introduce something client configurable ? At the same time, i was thinking whether it makes sense to introduce some tests that simulate slower than the "blazing" fast mocked brokers that exist in Unit Tests, so as to be able to catch this type of low timeouts that potentially make some software features not usable. h3. What is affected The above bug exists in MirrorSourceConnector Tasks running in distributed Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode enabled (pre-requisite for the exactly.once.support to work). I believe this should be true for other SourceConnectors as well (as the code-path that was the one to blame is Connect specific & not MirrorMaker specific). h3. More context & logs *Connector Logs* {code:java} Caused by: java.util.concurrent.CompletionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) {code} *Broker Logs* {code:java} [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning COORDINATOR_NOT_AVAILABLE error code to client for kafka-connect-uat-mm2-msc-20th-7's InitProducerId request (kafka.coordinator.transaction.TransactionCoordinator) [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager) {code} h3. How to reproduce it While the bug exists in both the Standalone MM2 deployment, it's easier to reproduce it via deploying the connector to a Kafka Connect cluster (as it is possible to update the config/delete/restart/pause/stop/resume via the Kafka Connect REST API) Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with `exactly.once.source.support = enabled`) and after the initial start, update it's configuration or restart the conn
[PR] Duplicate method; The QuotaUtils one is used. [kafka]
afshing opened a new pull request, #15066: URL: https://github.com/apache/kafka/pull/15066 It seems like this PR duplicated the implementation to QuotaUtils, but didn't remove this implementation *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on code in PR #15060: URL: https://github.com/apache/kafka/pull/15060#discussion_r1435282790 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse } } +@Test +public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException, IOException { +FileInputStream fileInputStream = mock(FileInputStream.class); +RemoteLogInputStream remoteLogInputStream = mock(RemoteLogInputStream.class); +ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); +RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); +LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); +when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) +.thenAnswer(a -> fileInputStream); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +int fetchOffset = 0; +int fetchMaxBytes = 10; +int recordBatchSizeInBytes = fetchMaxBytes + 1; +RecordBatch firstBatch = mock(RecordBatch.class); +ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + +FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( +Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() +); + +when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); +when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); Review Comment: Isn't one Optional.of(segmentMetadata) enough? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1412,8 +1422,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset, } } } - -private Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, +// visible for testing. +Optional findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, Option leaderEpochFileCacheOption) throws RemoteStorageException { Review Comment: ```suggestion Option leaderEpochFileCacheOption) throws RemoteStorageException { ``` ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); -int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); InputStream remoteSegInputStream = null; try { -// Search forward for the position of the last offset that is greater than or equal to the target offset -remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); -RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); - -RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); - +int startPos = 0; +RecordBatch firstBatch = null; +while (firstBatch == null && rlsMetadataOptional.isPresent()) { Review Comment: I am not super hung-up on this, but don't you just need to look forward once i.e. you have a guarantee that if you do not find the offset in this segment then you are bound to find it in the next, no? If this is the case can you just look in the next segment and not use a while loop? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1867985789 Hey @jolshan seems like the build finished without oom, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
mimaison merged PR #14867: URL: https://github.com/apache/kafka/pull/14867 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
mimaison commented on PR #14867: URL: https://github.com/apache/kafka/pull/14867#issuecomment-1867895289 I'm waiting for the CI to finish. I don't want to risk breaking trunk just before logging off for PTO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
tinaselenge commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1867865063 @dengziming I have triggered the build 3 more times and DeleteTopicTest passed. I have also enabled KRaft for testIncreasePartitionCountDuringDeleteTopic (they didn't have much common code so simply separated by a single `if (!kraftTest())` clause. I also added comments for 2 other tests explaining why KRaft is not enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]
srdo commented on PR #15024: URL: https://github.com/apache/kafka/pull/15024#issuecomment-1867860145 Sure, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor to only require one stopPartitions helper [kafka]
divijvaidya merged PR #14662: URL: https://github.com/apache/kafka/pull/14662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor to only require one stopPartitions helper [kafka]
divijvaidya commented on PR #14662: URL: https://github.com/apache/kafka/pull/14662#issuecomment-1867857397 Unrelated test failures ``` [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testReplicateSourceDefault__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testReplicateSourceDefault___2/) [Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsBounceTest.testWithGroupMetadata()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupMetadata__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.coordinator.group.GroupMetadataManagerTest.testStaticMemberGetsBackAssignmentUponRejoin()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/org.apache.kafka.coordinator.group/GroupMetadataManagerTest/Build___JDK_8_and_Scala_2_12___testStaticMemberGetsBackAssignmentUponRejoin__/) [Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testConsumptionWithBrokerFailures__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) [Build / JDK 21 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupMetadata()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_21_and_Scala_2_13___testWithGroupMetadata__/) [Build / JDK 21 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14662/5/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_21_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]
divijvaidya commented on PR #15024: URL: https://github.com/apache/kafka/pull/15024#issuecomment-1867854246 A fix to make CI stable was pushed today. @srdo Can you please rebase from trunk one time? It will probably help us get a better picture of CI failures/success. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
nizhikov commented on PR #14867: URL: https://github.com/apache/kafka/pull/14867#issuecomment-1867850331 @mimaison Are we want to merge this now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Almog Gavra resolved KAFKA-16046. - Resolution: Fixed > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] DNM: test [kafka]
lucasbru closed pull request #15064: DNM: test URL: https://github.com/apache/kafka/pull/15064 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799859#comment-17799859 ] Hugo Abreu edited comment on KAFKA-10875 at 12/22/23 3:22 PM: -- We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1. We can consistently reproduce the issue with the following snippet: {code:java} try (Consumer c = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ))) { final Instant instant = Instant.now(); final int times = 10; final List minutes = List.of( 1, 5, 10, 20, 25, 30, 60, 90 ); for (final Integer minute : minutes) { final Instant minus = instant.minus(minute, TimeUnit.MINUTES.toChronoUnit()); // Retries, just for the sake of it being transitive. for (int i = 0; i < times; i++) { final List partitionInfos = c.partitionsFor("topic"); final Map offsetTimes = c.offsetsForTimes( partitionInfos.stream() .filter(info -> info.partition() == 0) .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toMap(Function.identity(), v -> minus.toEpochMilli())) ); LOGGER.info("Going back {}min -- got: {}", minute, offsetTimes); if (offsetTimes.values().stream().allMatch(Objects::nonNull)) { // We got a response for all the partitions. break; } } LOGGER.info("=="); } } {code} Essentially, we get a topic with ~2 days retention period and some of the partitions, during the day, receive different loads. The behaviour that we are seeing is: * The last message received in e.g., partition 2 has the timestamp of 1703250071130 – Friday, 22 December 2023 13:01:11.130 * We attempt to search, using the `offsetsForTimes` (using the code bellow) The output is: {code:java} 13:52:25.134 [main] INFO KafkaConsumerGroupManagerTest - Going back 1min -- got: {topic-2=null} 13:52:43.310 [main] INFO KafkaConsumerGroupManagerTest - == 13:52:43.490 [main] INFO KafkaConsumerGroupManagerTest - Going back 5min -- got: {topic-2=null} 13:53:01.779 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:01.959 [main] INFO KafkaConsumerGroupManagerTest - Going back 10min -- got: {topic-2=null} 13:53:20.030 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:20.212 [main] INFO KafkaConsumerGroupManagerTest - Going back 20min -- got: {topic-2=null} 13:53:38.370 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:38.551 [main] INFO KafkaConsumerGroupManagerTest - Going back 25min -- got: {topic-2=null} 13:53:57.316 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:57.495 [main] INFO KafkaConsumerGroupManagerTest - Going back 30min -- got: {topic-2=null} 13:54:15.432 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - Going back 60min -- got: {topic-2=(timestamp=1703250071130, leaderEpoch=12, offset=178105)} 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.794 [main] INFO KafkaConsumerGroupManagerTest - Going back 90min -- got: {topic-2=(timestamp=1703248360581, leaderEpoch=12, offset=178095)} 13:54:15.795 [main] INFO KafkaConsumerGroupManagerTest - == {code} According to the documentation, all of these searchs should have a return, since we should obtain the earliest offset, greater than, or equal to the timestamp provided. This almost seems that the its the other way around, it seems we start searching from the timestamp given, to the front offsets. We'll try to get some time to look under the hood and see if we can help. was (Author: JIRAUSER303606): We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1. We can consistently reproduce the issue with the following snippet: {code:java} try (Consumer c = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
[jira] [Comment Edited] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799859#comment-17799859 ] Hugo Abreu edited comment on KAFKA-10875 at 12/22/23 3:21 PM: -- We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1. We can consistently reproduce the issue with the following snippet: {code:java} try (Consumer c = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ))) { final Instant instant = Instant.now(); final int times = 10; final List minutes = List.of( 1, 5, 10, 20, 25, 30, 60, 90 ); for (final Integer minute : minutes) { final Instant minus = instant.minus(minute, TimeUnit.MINUTES.toChronoUnit()); // Retries, just for the sake of it being transitive. for (int i = 0; i < times; i++) { final List partitionInfos = c.partitionsFor("topic"); final Map offsetTimes = c.offsetsForTimes( partitionInfos.stream() .filter(info -> info.partition() == 0) .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toMap(Function.identity(), v -> minus.toEpochMilli())) ); LOGGER.info("Going back {}min -- got: {}", minute, offsetTimes); if (offsetTimes.values().stream().allMatch(Objects::nonNull)) { // We got a response for all the partitions. break; } } LOGGER.info("=="); } } {code} Essentially, we get a topic with ~2 days retention period and some of the partitions, during the day, receive different loads. The behaviour that we are seeing is: * The last message received in e.g., partition 2 has the timestamp of 1703250071130 – Friday, 22 December 2023 13:01:11.130 * We attempt to search, using the `offsetsForTimes` (using the code bellow) The output is: {code:java} 13:52:25.134 [main] INFO KafkaConsumerGroupManagerTest - Going back 1min -- got: {topic-2=null} 13:52:43.310 [main] INFO KafkaConsumerGroupManagerTest - == 13:52:43.490 [main] INFO KafkaConsumerGroupManagerTest - Going back 5min -- got: {topic-2=null} 13:53:01.779 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:01.959 [main] INFO KafkaConsumerGroupManagerTest - Going back 10min -- got: {topic-2=null} 13:53:20.030 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:20.212 [main] INFO KafkaConsumerGroupManagerTest - Going back 20min -- got: {topic-2=null} 13:53:38.370 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:38.551 [main] INFO KafkaConsumerGroupManagerTest - Going back 25min -- got: {topic-2=null} 13:53:57.316 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:57.495 [main] INFO KafkaConsumerGroupManagerTest - Going back 30min -- got: {topic-2=null} 13:54:15.432 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - Going back 60min -- got: {topic-2=(timestamp=1703250071130, leaderEpoch=12, offset=178105)} 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.794 [main] INFO KafkaConsumerGroupManagerTest - Going back 90min -- got: {topic-2=(timestamp=1703248360581, leaderEpoch=12, offset=178095)} 13:54:15.795 [main] INFO KafkaConsumerGroupManagerTest - == {code} According to the documentation, all of these searchs should have a return, since we should obtain the earliest offset, greater than, or equal to the timestamp provided. This almost seems that the its the other way around. We'll try to get some time to look under the hood and see if we can help. was (Author: JIRAUSER303606): We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1. We can consistently reproduce the issue with the following snippet: {code:java} try (Consumer c = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
[jira] [Commented] (KAFKA-10875) offsetsForTimes returns null for some partitions when it shouldn't?
[ https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799859#comment-17799859 ] Hugo Abreu commented on KAFKA-10875: We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1. We can consistently reproduce the issue with the following snippet: {code:java} try (Consumer c = new KafkaConsumer<>(Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ))) { final Instant instant = Instant.now(); final int times = 10; final List minutes = List.of( 1, 5, 10, 20, 25, 30, 60, 90 ); for (final Integer minute : minutes) { final Instant minus = instant.minus(minute, TimeUnit.MINUTES.toChronoUnit()); // Retries, just for the sake of it being transitive. for (int i = 0; i < times; i++) { final List partitionInfos = c.partitionsFor("topic"); final Map offsetTimes = c.offsetsForTimes( partitionInfos.stream() .filter(info -> info.partition() == 0) .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toMap(Function.identity(), v -> minus.toEpochMilli())) ); LOGGER.info("Going back {}min -- got: {}", minute, offsetTimes); if (offsetTimes.values().stream().allMatch(Objects::nonNull)) { // We got a response for all the partitions. break; } } LOGGER.info("=="); } } {code} Essentially, we get a topic with ~2 days retention period and some of the partitions, during the day, some of these paritions receive different loads. The behaviour that we are seeing is: * The last message received in e.g., partition 2 has the timestamp of 1703250071130 – Friday, 22 December 2023 13:01:11.130 * We attempt to search, using the `offsetsForTimes` (using the code bellow) The output is: {code:java} 13:52:25.134 [main] INFO KafkaConsumerGroupManagerTest - Going back 1min -- got: {topic-2=null} 13:52:43.310 [main] INFO KafkaConsumerGroupManagerTest - == 13:52:43.490 [main] INFO KafkaConsumerGroupManagerTest - Going back 5min -- got: {topic-2=null} 13:53:01.779 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:01.959 [main] INFO KafkaConsumerGroupManagerTest - Going back 10min -- got: {topic-2=null} 13:53:20.030 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:20.212 [main] INFO KafkaConsumerGroupManagerTest - Going back 20min -- got: {topic-2=null} 13:53:38.370 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:38.551 [main] INFO KafkaConsumerGroupManagerTest - Going back 25min -- got: {topic-2=null} 13:53:57.316 [main] INFO KafkaConsumerGroupManagerTest - == 13:53:57.495 [main] INFO KafkaConsumerGroupManagerTest - Going back 30min -- got: {topic-2=null} 13:54:15.432 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - Going back 60min -- got: {topic-2=(timestamp=1703250071130, leaderEpoch=12, offset=178105)} 13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - == 13:54:15.794 [main] INFO KafkaConsumerGroupManagerTest - Going back 90min -- got: {topic-2=(timestamp=1703248360581, leaderEpoch=12, offset=178095)} 13:54:15.795 [main] INFO KafkaConsumerGroupManagerTest - == {code} According to the documentation, all of these searchs should have a return, since we should obtain the earliest offset, greater than, or equal to the timestamp provided. This almost seems that the its the other way around. We'll try to get some time to look under the hood and see if we can help. > offsetsForTimes returns null for some partitions when it shouldn't? > --- > > Key: KAFKA-10875 > URL: https://issues.apache.org/jira/browse/KAFKA-10875 > Project: Kafka > Issue Type: Bug >Reporter: Yifei Gong >Priority: Minor > > I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1 > I have my consume
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
jolshan commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1867795357 Hey @MikeEdgar I'm seeing KafkaAdminClientTest.testDescribeTopicByIds failing due to it expecting the old error. Can we update that test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16046: fix stream-stream-join store types [kafka]
lucasbru merged PR #15061: URL: https://github.com/apache/kafka/pull/15061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16046: fix stream-stream-join store types [kafka]
lucasbru commented on PR #15061: URL: https://github.com/apache/kafka/pull/15061#issuecomment-1867744948 Cause of OOM on trunk identified, so merging this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
lucasbru merged PR #15035: URL: https://github.com/apache/kafka/pull/15035 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Exit catcher should be reset after the cluster is shutdown [kafka]
dajac merged PR #15062: URL: https://github.com/apache/kafka/pull/15062 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Close RemoteLogManager in RemoteLogManagerTest [kafka]
dajac merged PR #15063: URL: https://github.com/apache/kafka/pull/15063 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: set test global timeout as 10 mins [kafka]
showuon commented on PR #15065: URL: https://github.com/apache/kafka/pull/15065#issuecomment-1867639044 @dajac @ijuma @jolshan @stanislavkozlovski , call for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: set test global timeout as 10 mins [kafka]
showuon opened a new pull request, #15065: URL: https://github.com/apache/kafka/pull/15065 As @stanislavkozlovski found [here](https://gist.github.com/stanislavkozlovski/8959f7ee59434f774841f4ae2f5228c2), there are some tests ran more than 1 ~ 2 hours, which make our CI build meet 8 hours timeout. We should set a timeout for each test as 10 mins using a junit5 feature [here](https://junit.org/junit5/docs/current/user-guide/#writing-tests-declarative-timeouts-default-timeouts). If it exceeds 10m, we should fail them, and investigate/improve them. This way, we can resolve the CI timeout issue. If it exceeds the timeout, it'll fail with the exception: ``` Gradle Test Run :storage:test > Gradle Test Executor 21 > TransactionsWithTieredStoreTest > testDelayedFetchIncludesAbortedTransaction(String) > testDelayedFetchIncludesAbortedTransaction(String).quorum=zk FAILED java.util.concurrent.TimeoutException: testDelayedFetchIncludesAbortedTransaction(java.lang.String) timed out after 10 seconds at org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) at org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) ``` However, the stream tests are still using junit4, we can only set timeout for each test suite. I added timeout for `KTableKTableLeftJoinTest` as highlighted [here](https://gist.github.com/stanislavkozlovski/8959f7ee59434f774841f4ae2f5228c2). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16046: fix stream-stream-join store types [kafka]
lucasbru commented on PR #15061: URL: https://github.com/apache/kafka/pull/15061#issuecomment-1867506949 Restarting the CI since we had OOM before - however, OOM affects trunk so if we don't see anything in the next run, I think we should merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] DNM: test [kafka]
lucasbru opened a new pull request, #15064: URL: https://github.com/apache/kafka/pull/15064 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
mfvitale commented on PR #14992: URL: https://github.com/apache/kafka/pull/14992#issuecomment-1867484294 > haha! I like your commit message "code beauty" :) 😉 > I see that this is your first change to Apache Kafka. Congratulations and welcome! Yes! Thanks! > As a next step, I will wait for CI to be stable before I merge this change in. Perfect! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16015] Fix custom timeouts overwritten by defaults [kafka]
sciclon2 commented on code in PR #15030: URL: https://github.com/apache/kafka/pull/15030#discussion_r1434907926 ## tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java: ## @@ -99,8 +99,12 @@ static void run(Duration timeoutMs, String... args) throws Exception { props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig())); } props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer()); -props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis())); -props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2)); +if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { +props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis())); +} +if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) { +props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2)); Review Comment: thanks for the review @divijvaidya I will do the change you proposed and add the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16015] Fix custom timeouts overwritten by defaults [kafka]
divijvaidya commented on code in PR #15030: URL: https://github.com/apache/kafka/pull/15030#discussion_r1434893044 ## tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java: ## @@ -99,8 +99,12 @@ static void run(Duration timeoutMs, String... args) throws Exception { props.putAll(Utils.loadProps(commandOptions.getAdminClientConfig())); } props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.getBootstrapServer()); -props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis())); -props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2)); +if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { +props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis())); +} +if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) { +props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(timeoutMs.toMillis() / 2)); Review Comment: while you are over here in this part of the code can you also make another change please? This should be Int.toString since request.timeout is an int. See: https://kafka.apache.org/documentation.html#producerconfigs_request.timeout.ms same for default.api.timeout.ms https://kafka.apache.org/documentation.html#adminclientconfigs_default.api.timeout.ms -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Close RemoteLogManager in RemoteLogManagerTest [kafka]
dajac opened a new pull request, #15063: URL: https://github.com/apache/kafka/pull/15063 I was inspecting heap dumps to chase the OOM errors that we see in builds and I have noticed that `remote-log-index-cleaner` threads were still running. This may be one of the sources. We should ensure that the RemoteLogManager is closed in RemoteLogManagerTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15301) [Tiered Storage] Historically compacted topics send request to remote for active segment during consume
[ https://issues.apache.org/jira/browse/KAFKA-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799742#comment-17799742 ] Arpit Goyal commented on KAFKA-15301: - [~mital.awachat] does this ticket addresses your concern. https://issues.apache.org/jira/browse/KAFKA-15388 > [Tiered Storage] Historically compacted topics send request to remote for > active segment during consume > --- > > Key: KAFKA-15301 > URL: https://issues.apache.org/jira/browse/KAFKA-15301 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.0 >Reporter: Mital Awachat >Assignee: Jimmy Wang >Priority: Major > Fix For: 3.7.0 > > > I have a use case where tiered storage plugin received requests for active > segments. The topics for which it happened were historically compacted topics > for which compaction was disabled and tiering was enabled. > Create topic with compact cleanup policy -> Produce data with few repeat keys > and create multiple segments -> let compaction happen -> change cleanup > policy to delete -> produce some more data for segment rollover -> enable > tiering on topic -> wait for segments to be uploaded to remote storage and > cleanup from local (active segment would remain), consume from beginning -> > Observe logs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
mfvitale commented on PR #14992: URL: https://github.com/apache/kafka/pull/14992#issuecomment-1867448606 @divijvaidya Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
clolov commented on PR #15060: URL: https://github.com/apache/kafka/pull/15060#issuecomment-1867443497 Thank you for the contribution! I will aim to provide a review throughout the day! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
divijvaidya commented on PR #14992: URL: https://github.com/apache/kafka/pull/14992#issuecomment-1867441468 @mfvitale one last comment about fixing indentation, otherwise we should be good to ship! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
divijvaidya commented on code in PR #14992: URL: https://github.com/apache/kafka/pull/14992#discussion_r1434876077 ## connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java: ## @@ -235,18 +235,31 @@ public Object toConnect(final Schema schema, final JsonNode value) { private final JsonDeserializer deserializer; public JsonConverter() { +this(true); +} + +/** + * Creates a JsonConvert initializing serializer and deserializer. + * + * @param enableModules permits to enable/disable the registration of additional Jackson modules. + * + * NOTE: This is visible only for testing + */ +public JsonConverter(boolean enableModules) { serializer = new JsonSerializer( mkSet(), -JSON_NODE_FACTORY +JSON_NODE_FACTORY, +enableModules ); deserializer = new JsonDeserializer( mkSet( -// this ensures that the JsonDeserializer maintains full precision on -// floating point numbers that cannot fit into float64 -DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS +// this ensures that the JsonDeserializer maintains full precision on Review Comment: please follow original indentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]
divijvaidya commented on PR #15024: URL: https://github.com/apache/kafka/pull/15024#issuecomment-1867439333 I have restarted the CI to get a better build status since last one failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Exit catcher should be reset after the cluster is shutdown [kafka]
dajac opened a new pull request, #15062: URL: https://github.com/apache/kafka/pull/15062 I was investigating a build which failed with "exit 1". In the logs of the broker, I was that the first call to exist was caught. However, a second one was not. See the logs below. The issue seems to be that we must first shutdown the cluster before reseting the exit catcher. Otherwise, there is still a change for the broker to call exit. ``` [2023-12-21 13:52:59,310] ERROR Shutdown broker because all log dirs in /tmp/kafka-2594137463116889965 have failed (kafka.log.LogManager:143) [2023-12-21 13:52:59,312] ERROR test error (kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceWithIbp26Test:76) java.lang.RuntimeException: halt(1, null) called! at kafka.server.QuorumTestHarness.$anonfun$setUp$4(QuorumTestHarness.scala:273) at org.apache.kafka.common.utils.Exit.halt(Exit.java:63) at kafka.utils.Exit$.halt(Exit.scala:33) at kafka.log.LogManager.handleLogDirFailure(LogManager.scala:224) at kafka.server.ReplicaManager.handleLogDirFailure(ReplicaManager.scala:2600) at kafka.server.ReplicaManager$LogDirFailureHandler.doWork(ReplicaManager.scala:324) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131) ``` ``` [2023-12-21 13:53:05,797] ERROR Shutdown broker because all log dirs in /tmp/kafka-7355495604650755405 have failed (kafka.log.LogManager:143) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
nizhikov commented on PR #14867: URL: https://github.com/apache/kafka/pull/14867#issuecomment-1867374109 @mimaison > This commit cannot be built Build status is scary, but looks like some tests just failed. And they not related to my changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org