[GitHub] [kafka] lucasbru commented on pull request #14403: KAFKA-10199: Add missing catch for lock exception
lucasbru commented on PR #14403: URL: https://github.com/apache/kafka/pull/14403#issuecomment-1727032413 No unit test a la `shouldRetryInitializationWhenLockExceptionInStateUpdater` for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nelson B. updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # {color:#00875a}AbstractStreamTest{color} (owner: Christo) # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo) # {color:#00875a}KTableImplTest{color} (owner: Christo) # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo) # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo) # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}RecordCollectorTest{color} (owner: Christo) # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo) # {color:#0087
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nelson B. updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] # {color:#00875a}AbstractStreamTest{color} (owner: Christo) # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo) # {color:#00875a}KTableImplTest{color} (owner: Christo) # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo) # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo) # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo) # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo) # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}RecordCollectorTest{color} (owner: Christo) # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo) # {color:#0087
[GitHub] [kafka] bachmanity1 commented on pull request #14410: KAFKA-14133: Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest
bachmanity1 commented on PR #14410: URL: https://github.com/apache/kafka/pull/14410#issuecomment-1727005007 @clolov @yashmayya can you please have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 opened a new pull request, #14410: KAFKA-14133: Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest
bachmanity1 opened a new pull request, #14410: URL: https://github.com/apache/kafka/pull/14410 Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8391: -- Fix Version/s: 2.4.0 > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output
[ https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-13875: --- Fix Version/s: 3.6.0 > update docs to include topoicId for kafka-topics.sh --describe output > - > > Key: KAFKA-13875 > URL: https://issues.apache.org/jira/browse/KAFKA-13875 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie > Fix For: 3.6.0 > > > The topic describe output in quickstart doc here: > [https://kafka.apache.org/quickstart] should be updated now. > {code:java} > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server > localhost:9092 > Topic:quickstart-events PartitionCount:1ReplicationFactor:1 Configs: > Topic: quickstart-events Partition: 0Leader: 0 Replicas: 0 Isr: > 0{code} > After Topic Id implementation, we included the topic id info in the output > now. Also the configs is not empty now. The doc should be updated to avoid > new users get confused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15286) Migrate ApiVersion related code to kraft
[ https://issues.apache.org/jira/browse/KAFKA-15286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15286: --- Fix Version/s: 3.6.0 > Migrate ApiVersion related code to kraft > > > Key: KAFKA-15286 > URL: https://issues.apache.org/jira/browse/KAFKA-15286 > Project: Kafka > Issue Type: Task >Reporter: Deng Ziming >Assignee: Deng Ziming >Priority: Major > Fix For: 3.6.0 > > > In many places involving ApiVersion, we only support zk, we should move it > forward to kraft. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15424) Make verification a dynamic configuration
[ https://issues.apache.org/jira/browse/KAFKA-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15424: --- Fix Version/s: 3.5.0 > Make verification a dynamic configuration > - > > Key: KAFKA-15424 > URL: https://issues.apache.org/jira/browse/KAFKA-15424 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.5.0 > > > It would be nice if we can dynamically disable the verification. This can > prevent disruptive actions like a roll if the feature is causing issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-10339: --- Fix Version/s: 3.5.0 > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > Fix For: 3.5.0 > > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15017: --- Fix Version/s: 3.5.0 > New ClientQuotas are not written to ZK from snapshot > - > > Key: KAFKA-15017 > URL: https://issues.apache.org/jira/browse/KAFKA-15017 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: David Arthur >Assignee: Proven Provenzano >Priority: Critical > Fix For: 3.5.0 > > > Similar issue to KAFKA-15009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14291: --- Fix Version/s: 3.5.0 > KRaft: ApiVersionsResponse doesn't have finalizedFeatures and > finalizedFeatureEpoch in KRaft mode > - > > Key: KAFKA-14291 > URL: https://issues.apache.org/jira/browse/KAFKA-14291 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Akhilesh Chaganti >Assignee: Deng Ziming >Priority: Critical > Fix For: 3.5.0 > > > https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53 > ``` > class SimpleApiVersionManager( > val listenerType: ListenerType, > val enabledApis: collection.Set[ApiKeys], > brokerFeatures: Features[SupportedVersionRange] > ) extends ApiVersionManager { > def this(listenerType: ListenerType) = { > this(listenerType, ApiKeys.apisForListener(listenerType).asScala, > BrokerFeatures.defaultSupportedFeatures()) > } > private val apiVersions = > ApiVersionsResponse.collectApis(enabledApis.asJava) > override def apiVersionResponse(requestThrottleMs: Int): > ApiVersionsResponse = { > ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, > apiVersions, brokerFeatures) > } > } > ``` > ApiVersionManager for KRaft doesn't add the finalizedFeatures and > finalizedFeatureEpoch to the ApiVersionsResponse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-13295: --- Fix Version/s: 3.4.0 > Long restoration times for new tasks can lead to transaction timeouts > - > > Key: KAFKA-13295 > URL: https://issues.apache.org/jira/browse/KAFKA-13295 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Critical > Labels: eos, new-streams-runtime-should-fix > Fix For: 3.4.0 > > > In some EOS applications with relatively long restoration times we've noticed > a series of ProducerFencedExceptions occurring during/immediately after > restoration. The broker logs were able to confirm these were due to > transactions timing out. > In Streams, it turns out we automatically begin a new txn when calling > {{send}} (if there isn’t already one in flight). A {{send}} occurs often > outside a commit during active processing (eg writing to the changelog), > leaving the txn open until the next commit. And if a StreamThread has been > actively processing when a rebalance results in a new stateful task without > revoking any existing tasks, the thread won’t actually commit this open txn > before it goes back into the restoration phase while it builds up state for > the new task. So the in-flight transaction is left open during restoration, > during which the StreamThread only consumes from the changelog without > committing, leaving it vulnerable to timing out when restoration times exceed > the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-12283: --- Fix Version/s: 3.5.0 > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.5.0 > > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-4327: -- Fix Version/s: 3.5.0 > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Labels: kip > Fix For: 3.5.0 > > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility – not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. > KIP-756: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14931) Revert KAFKA-14561 in 3.5
[ https://issues.apache.org/jira/browse/KAFKA-14931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14931: --- Fix Version/s: 3.5.0 > Revert KAFKA-14561 in 3.5 > - > > Key: KAFKA-14931 > URL: https://issues.apache.org/jira/browse/KAFKA-14931 > Project: Kafka > Issue Type: Task >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.5.0 > > > We have too many blockers for this commit to work well, so in the interest of > code quality, we should revert > https://issues.apache.org/jira/browse/KAFKA-14561 in 3.5 and fix the issues > for 3.6 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14904) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()
[ https://issues.apache.org/jira/browse/KAFKA-14904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14904: --- Fix Version/s: 3.5.0 > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId() > -- > > Key: KAFKA-14904 > URL: https://issues.apache.org/jira/browse/KAFKA-14904 > Project: Kafka > Issue Type: Test >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.5.0 > > > After merging KAFKA-14561 I noticed this test still occasionally failed via > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6ms while awaiting EndTxn(true) > I will investigate the cause. > Note: This error occurs when we are waiting for the transaction to be > committed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional
[ https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14916: --- Fix Version/s: 3.5.0 > Fix code that assumes transactional ID implies all records are transactional > > > Key: KAFKA-14916 > URL: https://issues.apache.org/jira/browse/KAFKA-14916 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.5.0 > > > KAFKA-14561 wrote code that assumed that if a transactional ID was included, > all record batches were transactional and had the same producer ID. > This work with improve validation and fix the code that assumes all batches > are transactional. > Further, KAFKA-14561 will not assume all records are transactional. > Originally this ticket had an action item to ensure all the producer IDs are > the same in the batches since we send a single txn ID, but that can be done > in a followup KAFKA-14958, as we still need to assess if we can enforce this > without breaking workloads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences
[ https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14920: --- Fix Version/s: 3.6.0 > Address timeouts and out of order sequences > --- > > Key: KAFKA-14920 > URL: https://issues.apache.org/jira/browse/KAFKA-14920 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > KAFKA-14844 showed the destructive nature of a timeout on the first produce > request for a topic partition (ie one that has no state in psm) > Since we currently don't validate the first sequence (we will in part 2 of > kip-890), any transient error on the first produce can lead to out of order > sequences that never recover. > Originally, KAFKA-14561 relied on the producer's retry mechanism for these > transient issues, but until that is fixed, we may need to retry from in the > AddPartitionsManager instead. We addressed the concurrent transactions, but > there are other errors like coordinator loading that we could run into and > see increased out of order issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14970) Dual write mode testing for SCRAM and Quota
[ https://issues.apache.org/jira/browse/KAFKA-14970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14970: --- Fix Version/s: 3.5.0 > Dual write mode testing for SCRAM and Quota > --- > > Key: KAFKA-14970 > URL: https://issues.apache.org/jira/browse/KAFKA-14970 > Project: Kafka > Issue Type: Test > Components: kraft >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Blocker > Labels: 3.5 > Fix For: 3.5.0 > > > SCRAM and Quota are stored together in ZK and we need better testing to > validate the dual write mode support for them. > I will add some additional tests for this. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14884) Include check transaction is still ongoing right before append
[ https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14884: --- Fix Version/s: 3.6.0 > Include check transaction is still ongoing right before append > --- > > Key: KAFKA-14884 > URL: https://issues.apache.org/jira/browse/KAFKA-14884 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > Even after checking via AddPartitionsToTxn, the transaction could be aborted > after the response. We can add one more check before appending. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15380) Try complete actions after callback
[ https://issues.apache.org/jira/browse/KAFKA-15380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15380: --- Fix Version/s: 3.6.0 > Try complete actions after callback > --- > > Key: KAFKA-15380 > URL: https://issues.apache.org/jira/browse/KAFKA-15380 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.6.0 > > > KIP-890 part 1 introduced the callback request type. It is used to execute a > callback after KafkaApis.handle has returned. We did not account for > tryCompleteActions at the end of handle when making this change. > In tests, we saw produce p99 increase dramatically (likely because we have to > wait for another request before we can complete DelayedProduce). As a result, > we should add the tryCompleteActions after the callback as well. In testing, > this improved the produce performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15315) Use getOrDefault rather than get
[ https://issues.apache.org/jira/browse/KAFKA-15315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15315: --- Fix Version/s: 3.7.0 > Use getOrDefault rather than get > > > Key: KAFKA-15315 > URL: https://issues.apache.org/jira/browse/KAFKA-15315 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: roon >Priority: Trivial > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE
[ https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14900: --- Fix Version/s: 3.5.0 > Flaky test AuthorizerTest failing with NPE > -- > > Key: KAFKA-14900 > URL: https://issues.apache.org/jira/browse/KAFKA-14900 > Project: Kafka > Issue Type: Test > Components: kraft >Reporter: Greg Harris >Assignee: Colin McCabe >Priority: Minor > Labels: flaky-test > Fix For: 3.5.0 > > > The AuthorizerTest has multiple tests that appear to have the same flaky > failure: > {noformat} > org.apache.kafka.server.fault.FaultHandlerException: > quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: > Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value > of "kafka.server.SharedServer.raftManager()" is null > at > app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) > at > app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) > at > app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) > at > app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "kafka.raft.KafkaRaftManager.client()" because the return value of > "kafka.server.SharedServer.raftManager()" is null > ... 8 more{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14790) Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
[ https://issues.apache.org/jira/browse/KAFKA-14790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14790: --- Fix Version/s: 3.5.0 > Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests > --- > > Key: KAFKA-14790 > URL: https://issues.apache.org/jira/browse/KAFKA-14790 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Minor > Fix For: 3.5.0 > > > Followup from [https://github.com/apache/kafka/pull/13231] > We should add authorizer tests for the new version. > We should add some more tests to KafkaApis to cover auth and validation > failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14791) Create a builder class for PartitionRegistration
[ https://issues.apache.org/jira/browse/KAFKA-14791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14791: --- Fix Version/s: 3.6.0 > Create a builder class for PartitionRegistration > > > Key: KAFKA-14791 > URL: https://issues.apache.org/jira/browse/KAFKA-14791 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Minor > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly
[ https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14913: --- Fix Version/s: 3.5.0 > Migrate DistributedHerder Executor shutdown to use > ThreadUtils#shutdownExecutorServiceQuietly > - > > Key: KAFKA-14913 > URL: https://issues.apache.org/jira/browse/KAFKA-14913 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Minor > Fix For: 3.5.0 > > > Some context here: > https://github.com/apache/kafka/pull/13557#issuecomment-1509738740 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15148) Some integration tests are running as unit tests
[ https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15148: --- Fix Version/s: 3.6.0 > Some integration tests are running as unit tests > > > Key: KAFKA-15148 > URL: https://issues.apache.org/jira/browse/KAFKA-15148 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Assignee: Ezio Xie >Priority: Minor > Labels: newbie > Fix For: 3.6.0 > > > *This is a good item for a newcomer into Kafka code base to pick up* > > When we run `./gradlew unitTest`, it is supposed to run all unit tests. > However, we are running some integration tests as part of which makes the > overall process of running unitTest take longer than expected. > Example of such tests: > > :streams:unitTest > Executing test > > org.apache...integration.NamedTopologyIntegrationTest > > :streams:unitTest > Executing test > > org.apache...integration.QueryableStateIntegrationTest > After this task, we should not run the these tests as part of `./gradlew > unitTest`, instead they should be run as part of `./gradlew integrationTest`. > As part of acceptance criteria, please add the snapshot of html summary > generated to verify that these tests are indeed running as part of > integrationTest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14898: --- Fix Version/s: 3.5.0 > [ MirrorMaker ] sync.topic.configs.enabled not working as expected > -- > > Key: KAFKA-14898 > URL: https://issues.apache.org/jira/browse/KAFKA-14898 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Srinivas Boga >Priority: Major > Labels: mirrormaker > Fix For: 3.5.0 > > > Hello, > In my replication set up , i do not want to sync the topic configs, the use > case is to have different retention time for the topic on the target cluster, > I am passing the config > {code:java} > sync.topic.configs.enabled = false{code} > but this is not working as expected the topic retention time is being set to > whatever is being set in the source cluster, looking at the mirrormaker logs > i can see that MirrorSourceConnector is still setting the above config as true > {code:java} > [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig > values: > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.include.jmx.reporter = true > auto.offset.reset = earliest > bootstrap.servers = [sourcecluster.com:9092] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = consumer-null-2 > client.rack = > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = null > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor, class > org.apache.kafka.clients.consumer.CooperativeStickyAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 360 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 45000 > socket.connection.setup.timeout.max.ms = 3 > socket.connection.setup.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.k
[jira] [Updated] (KAFKA-14925) The website shouldn't load external resources
[ https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14925: --- Fix Version/s: 3.5.0 > The website shouldn't load external resources > - > > Key: KAFKA-14925 > URL: https://issues.apache.org/jira/browse/KAFKA-14925 > Project: Kafka > Issue Type: Improvement > Components: website >Reporter: Mickael Maison >Assignee: Atul Sharma >Priority: Major > Fix For: 3.5.0 > > > In includes/_header.htm, we load a resource from fontawesome.com -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14859) Support SCRAM ZK to KRaft Migration
[ https://issues.apache.org/jira/browse/KAFKA-14859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14859: --- Fix Version/s: 3.6.0 > Support SCRAM ZK to KRaft Migration > --- > > Key: KAFKA-14859 > URL: https://issues.apache.org/jira/browse/KAFKA-14859 > Project: Kafka > Issue Type: Improvement >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > Fix For: 3.6.0 > > > I want to allow existing ZK installations to be able to migrate to KRaft and > support their existing SCRAM credentials. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14859) Support SCRAM ZK to KRaft Migration
[ https://issues.apache.org/jira/browse/KAFKA-14859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14859: --- Fix Version/s: 3.5.0 (was: 3.6.0) > Support SCRAM ZK to KRaft Migration > --- > > Key: KAFKA-14859 > URL: https://issues.apache.org/jira/browse/KAFKA-14859 > Project: Kafka > Issue Type: Improvement >Reporter: Proven Provenzano >Assignee: Proven Provenzano >Priority: Major > Fix For: 3.5.0 > > > I want to allow existing ZK installations to be able to migrate to KRaft and > support their existing SCRAM credentials. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14857) Fix some MetadataLoader bugs
[ https://issues.apache.org/jira/browse/KAFKA-14857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14857: --- Fix Version/s: 3.5.0 > Fix some MetadataLoader bugs > > > Key: KAFKA-14857 > URL: https://issues.apache.org/jira/browse/KAFKA-14857 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-7497: -- Fix Version/s: 3.4.0 > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > Fix For: 3.4.0 > > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14835) Create ControllerServerMetricsPublisher
[ https://issues.apache.org/jira/browse/KAFKA-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14835: --- Fix Version/s: 3.5.0 > Create ControllerServerMetricsPublisher > --- > > Key: KAFKA-14835 > URL: https://issues.apache.org/jira/browse/KAFKA-14835 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14961) DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky
[ https://issues.apache.org/jira/browse/KAFKA-14961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14961: --- Fix Version/s: 3.5.0 > DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky > - > > Key: KAFKA-14961 > URL: https://issues.apache.org/jira/browse/KAFKA-14961 > Project: Kafka > Issue Type: Test >Reporter: Manyanda Chitimbo >Assignee: Manyanda Chitimbo >Priority: Major > Fix For: 3.5.0 > > > When running the test suite locally I noticed the following error > {code:java} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) > at > app//org.apache.kafka.clients.consumer.internals.DefaultBackgroundThreadTest.testStartupAndTearDown(DefaultBackgroundThreadTest.java:95) > {code} > which happened only once and I could reproduce it again. > I further noticed some NPE in debug logs in the form of > {code:java} > ERROR The background thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread:166) > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.handlePollResult(DefaultBackgroundThread.java:200) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at > java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:553) > at > org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.runOnce(DefaultBackgroundThread.java:187) > at > org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.run(DefaultBackgroundThread.java:159) > {code} > which is due to missing stubs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready
[ https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14658: --- Fix Version/s: 3.5.0 > When listening on fixed ports, defer port opening until we're ready > --- > > Key: KAFKA-14658 > URL: https://issues.apache.org/jira/browse/KAFKA-14658 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.5.0 > > > When we are listening on fixed ports, we should defer opening ports until > we're ready to accept traffic. If we open the broker port too early, it can > confuse monitoring and deployment systems. This is a particular concern when > in KRaft mode, since in that mode, we create the SocketServer object earlier > in the startup process than when in ZK mode. > The approach taken in this PR is to defer opening the acceptor port until > Acceptor.start is called. Note that when we are listening on a random port, > we continue to open the port "early," in the SocketServer constructor. The > reason for doing this is that there is no other way to find the random port > number the kernel has selected. Since random port assignment is not used in > production deployments, this should be reasonable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation
philipnee commented on code in PR #14357: URL: https://github.com/apache/kafka/pull/14357#discussion_r1330957545 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent; +import org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +/** + * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to a) determine, and b) modify the + * current set of assigned {@link TopicPartition partitions} via {@link SubscriptionState}. Reconciliation is a + * two-part process, the first being a revocation of partitions, followed by assignment of partitions. Each of the two + * steps may result in one of the following: + * + * + * + * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the set of partitions. + * + * + * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment have started. In practice this means + * that the appropriate {@link ConsumerRebalanceListener} callback method is being invoked. + * + * + * {@link ReconciliationResult#COMPLETED}: the {@link ConsumerRebalanceListener} callback method was made and + * the changes were applied locally. + * + * + * {@link ReconciliationResult#EXPIRED}: something happened to cause the operation to modify the assigned set + * of partitions. This could be caused by a {@link ConsumerRebalanceListener} callback method that takes too + * long to execute, interruption with the consumer group coordinator, or other reasons. + * + * + * + * The comparison against the {@link SubscriptionState#assignedPartitions() current set of assigned partitions} and + * the {@link Assignment#assignedTopicPartitions() target set of assigned partitions} is performed by essentially + * flattening the respective entries into two sets of {@link org.apache.kafka.common.TopicPartition partitons} + * which are then compared using basic {@link Set} comparisons. + */ +public class MemberAssignmentReconciler { + +/** + * The result of the {@link #revoke(Optional, Timer)} or {@link #assign(Optional, Timer)} methods being invoked. + */ +enum ReconciliationResult { +NO_CHANGE, +IN_PROGRESS, +COMPLETED, +EXPIRED +} + +// Ugly little handler enum for making logging less verbose. +private enum Operation { + +REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned"); + +private final String verbPastTense; + +private final String methodName; + +Operation(String verbPresentTense, String verbPastTense) { +this.verbPastTense = verbPastTense; +this.methodName = String.format("%s.onPartitions%s()", ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 1).toUpperCase(Locale.ROOT) + verbPrese
[jira] [Updated] (KAFKA-15036) Kraft leader change fails when invoking getFinalizedFeatures
[ https://issues.apache.org/jira/browse/KAFKA-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15036: --- Fix Version/s: 3.6.0 > Kraft leader change fails when invoking getFinalizedFeatures > > > Key: KAFKA-15036 > URL: https://issues.apache.org/jira/browse/KAFKA-15036 > Project: Kafka > Issue Type: Improvement >Reporter: Deng Ziming >Assignee: Deng Ziming >Priority: Major > Fix For: 3.6.0 > > > When kraft leader changes, we can receiving a error as follows: > > {{[2023-05-24 18:00:02,898] WARN [QuorumController id=3002] > getFinalizedFeatures: failed with unknown server exception RuntimeException > in 271 us. The controller is already in standby mode. > (org.apache.kafka.controller.QuorumController) > java.lang.RuntimeException: No in-memory snapshot for epoch 9. Snapshot > epochs are: > at > org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173) > at > org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131) > at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69) > at > org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303) > at > org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016) > at > org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:829)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14956: --- Fix Version/s: 3.5.0 > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.ja
[jira] [Updated] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
[ https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14278: --- Fix Version/s: 3.6.0 > Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit > --- > > Key: KAFKA-14278 > URL: https://issues.apache.org/jira/browse/KAFKA-14278 > Project: Kafka > Issue Type: Sub-task > Components: producer , streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15028) AddPartitionsToTxnManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15028: --- Fix Version/s: 3.6.0 > AddPartitionsToTxnManager metrics > - > > Key: KAFKA-15028 > URL: https://issues.apache.org/jira/browse/KAFKA-15028 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.6.0 > > Attachments: latency-cpu.html > > > KIP-890 added metrics for the AddPartitionsToTxnManager > VerificationTimeMs – number of milliseconds from adding partition info to the > manager to the time the response is sent. This will include the round trip to > the transaction coordinator if it is called. This will also account for > verifications that fail before the coordinator is called. > VerificationFailureRate – rate of verifications that returned in failure > either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14518) Rebalance on topic/partition metadata changes
[ https://issues.apache.org/jira/browse/KAFKA-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14518: --- Fix Version/s: 3.6.0 > Rebalance on topic/partition metadata changes > - > > Key: KAFKA-14518 > URL: https://issues.apache.org/jira/browse/KAFKA-14518 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15131) Improve RemoteStorageManager exception handling documentation
[ https://issues.apache.org/jira/browse/KAFKA-15131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15131: --- Fix Version/s: 3.6.0 > Improve RemoteStorageManager exception handling documentation > - > > Key: KAFKA-15131 > URL: https://issues.apache.org/jira/browse/KAFKA-15131 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: tiered-storage > Fix For: 3.6.0 > > > As discussed here[1], RemoteStorageManager javadocs requires clarification > regarding error handling: > * Remove ambiguity on `RemoteResourceNotFoundException` description > * Describe when `RemoteResourceNotFoundException` can/should be thrown > * Describe expectations for idempotent operations when copying/deleting > > [1] > https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14848) KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig
[ https://issues.apache.org/jira/browse/KAFKA-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14848: --- Fix Version/s: 3.5.0 > KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig > > > Key: KAFKA-14848 > URL: https://issues.apache.org/jira/browse/KAFKA-14848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.5.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > [~rayokota] found some {{{}NullPointerException{}}}s that originate because > of a recently introduced error in the {{KafkaConsumer}} constructor. The code > was changed to pass the deserializer variables into the {{FetchConfig}} > constructor. However, this code change incorrectly used the locally-scoped > variables, not the instance-scoped variables. Since the locally-scoped > variables could be {{{}null{}}}, this results in the {{FetchConfig}} storing > {{null}} references, leading to downstream breakage. > Suggested change: > {noformat} > - FetchConfig fetchConfig = new FetchConfig<>(config, keyDeserializer, > valueDeserializer, isolationLevel); > + FetchConfig fetchConfig = new FetchConfig<>(config, > this.keyDeserializer, this.valueDeserializer, isolationLevel); > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect
[ https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-12525: --- Fix Version/s: 3.6.0 > Inaccurate task status due to status record interleaving in fast rebalances > in Connect > -- > > Key: KAFKA-12525 > URL: https://issues.apache.org/jira/browse/KAFKA-12525 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1 >Reporter: Konstantine Karantasis >Assignee: Sagar Rao >Priority: Major > Fix For: 3.6.0 > > > When a task is stopped in Connect it produces an {{UNASSIGNED}} status > record. > Equivalently, when a task is started or restarted in Connect it produces an > {{RUNNING}} status record in the Connect status topic. > At the same time rebalances are decoupled from task start and stop. These > operations happen in separate executor outside of the main worker thread that > performs the rebalance. > Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by > the worker that is sending them. This worker is using the > {{StatusBackingStore#putSafe}} method that will reject any stale status > messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker > is aware of the newer status record that declares a task as {{RUNNING}}. > In cases of fast consecutive rebalances where a task is revoked from one > worker and assigned to another one, it has been observed that there is a > small time window and thus a race condition during which a {{RUNNING}} status > record in the new generation is produced and is immediately followed by a > delayed {{UNASSIGNED}} status record belonging to the same or a previous > generation before the worker that sends this message reads the {{RUNNING}} > status record that corresponds to the latest generation. > A couple of options are available to remediate this race condition. > For example a worker that is has started a task can re-write the {{RUNNING}} > status message in the topic if it reads a stale {{UNASSIGNED}} message from a > previous generation (that should have been fenced). > Another option is to ignore stale {{UNASSIGNED}} message (messages from an > earlier generation than the one in which the task had {{RUNNING}} status). > Worth noting that when this race condition takes place, besides the > inaccurate status representation, the actual execution of the tasks remains > unaffected (e.g. the tasks are running correctly even though they appear as > {{UNASSIGNED}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15272) Fix the logic which finds candidate log segments to upload it to tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15272: --- Fix Version/s: 3.6.0 > Fix the logic which finds candidate log segments to upload it to tiered > storage > --- > > Key: KAFKA-15272 > URL: https://issues.apache.org/jira/browse/KAFKA-15272 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0 > > > In tiered storage, a segment is eligible for deletion from local disk when it > gets uploaded to the remote storage. > If the topic active segment contains some messages and there are no new > incoming messages, then the active segment gets rotated to passive segment > after the configured {{log.roll.ms}} timeout. > > The > [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L553] > to find the candidate segment in RemoteLogManager does not include the > recently rotated passive segment as eligible to upload it to remote storage > so the passive segment won't be removed even after if it breaches by > retention time/size. (ie) Topic won't be empty after it becomes stale. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15290) Add support to onboard existing topics to tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-15290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15290: --- Fix Version/s: 3.6.0 > Add support to onboard existing topics to tiered storage > > > Key: KAFKA-15290 > URL: https://issues.apache.org/jira/browse/KAFKA-15290 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0 > > > This task is about adding support to enable tiered storage for existing > topics in the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation
[ https://issues.apache.org/jira/browse/KAFKA-14888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14888: --- Fix Version/s: 3.6.0 > RemoteLogManager - deleting expired/size breached log segments to remote > storage implementation > > > Key: KAFKA-14888 > URL: https://issues.apache.org/jira/browse/KAFKA-14888 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Satish Duggana >Priority: Major > Fix For: 3.6.0 > > > Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA > covers deleting time/size breached log segments in remote storage. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15167) Tiered Storage Test Harness Framework
[ https://issues.apache.org/jira/browse/KAFKA-15167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15167: --- Fix Version/s: 3.6.0 > Tiered Storage Test Harness Framework > - > > Key: KAFKA-15167 > URL: https://issues.apache.org/jira/browse/KAFKA-15167 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0 > > > Base class for integration tests exercising the tiered storage functionality > in Kafka. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14821) Better handle timeouts in ListOffsets API
[ https://issues.apache.org/jira/browse/KAFKA-14821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-14821: --- Fix Version/s: 3.5.0 > Better handle timeouts in ListOffsets API > - > > Key: KAFKA-14821 > URL: https://issues.apache.org/jira/browse/KAFKA-14821 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Dimitar Dimitrov >Assignee: Dimitar Dimitrov >Priority: Major > Fix For: 3.5.0 > > > The ListOffsets Admin API doesn't retry failed requests for partitions due to > timeouts or due to other types of retriable exceptions. This is a step back > compared to the Consumer offset APIs implemented in the fetcher as the latter > can do partial retries in such cases. > * The comparison is notable as some Kafka tools (e.g. > {{{}kafka-get-offsets{}}}) have moved from using the Consumer offset APIs to > using the ListOffsets Admin API. > One nice way to address that seems to be to migrate the ListOffsets API to > use the more modern AdminApiDriver mechanism. That should automatically > provide the capability to retry requests which responses are deemed retriable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager
[ https://issues.apache.org/jira/browse/KAFKA-15181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15181: --- Fix Version/s: 3.6.0 > Race condition on partition assigned to TopicBasedRemoteLogMetadataManager > --- > > Key: KAFKA-15181 > URL: https://issues.apache.org/jira/browse/KAFKA-15181 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Abhijeet Kumar >Priority: Major > Labels: tiered-storage > Fix For: 3.6.0 > > > TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared > whever partitions are assigned. > When partitions are assigned to the TBRLMM instance, a consumer is started to > keep the cache up to date. > If the cache hasn't finalized to build, TBRLMM fails to return remote > metadata about partitions that are store on the backing topic. TBRLMM may not > recover from this failing state. > A proposal to fix this issue would be wait after a partition is assigned for > the consumer to catch up. A similar logic is used at the moment when TBRLMM > writes to the topic, and uses send callback to wait for consumer to catch up. > This logic can be reused whever a partition is assigned, so when TBRLMM is > marked as initialized, cache is ready to serve requests. > Reference: https://github.com/aiven/kafka/issues/33 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-9564) Integration Test framework for Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-9564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-9564: -- Fix Version/s: 3.6.0 > Integration Test framework for Tiered Storage > - > > Key: KAFKA-9564 > URL: https://issues.apache.org/jira/browse/KAFKA-9564 > Project: Kafka > Issue Type: Sub-task >Reporter: Alexandre Dupriez >Assignee: Alexandre Dupriez >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15289: --- Fix Version/s: 3.6.0 > Support KRaft mode in RequestQuotaTest > -- > > Key: KAFKA-15289 > URL: https://issues.apache.org/jira/browse/KAFKA-15289 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > Fix For: 3.6.0 > > > we are calling `zkBrokerApis` in RequestQuotaTest, we should ensure kraft > broker apis are also supported, so use clientApis as far as possible.use > zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15288) Change BrokerApiVersionsCommandTest to support kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-15288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15288: --- Fix Version/s: 3.6.0 > Change BrokerApiVersionsCommandTest to support kraft mode > - > > Key: KAFKA-15288 > URL: https://issues.apache.org/jira/browse/KAFKA-15288 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Minor > Fix For: 3.6.0 > > > Currently we only test zk mode for BrokerApiVersionsCommand -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15287) Change NodeApiVersions.create() to contains both apis of zk and kraft broker
[ https://issues.apache.org/jira/browse/KAFKA-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15287: --- Fix Version/s: 3.6.0 > Change NodeApiVersions.create() to contains both apis of zk and kraft broker > - > > Key: KAFKA-15287 > URL: https://issues.apache.org/jira/browse/KAFKA-15287 > Project: Kafka > Issue Type: Sub-task >Reporter: Deng Ziming >Priority: Major > Labels: newbee > Fix For: 3.6.0 > > > We are using ApiKeys.zkBrokerApis() when calling NodeApiVersions.create(), > this means we only support zk broker apis. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15439) Add transaction tests enabled with tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-15439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15439: --- Fix Version/s: 3.6.0 3.7.0 > Add transaction tests enabled with tiered storage > - > > Key: KAFKA-15439 > URL: https://issues.apache.org/jira/browse/KAFKA-15439 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.6.0, 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on PR #14364: URL: https://github.com/apache/kafka/pull/14364#issuecomment-1726880993 @dajac @lianetm - Made some changes based on the comments, but obviously broke some existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330940223 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { +return new NetworkClientDelegate.PollResult( +Long.MAX_VALUE, Collections.emptyList()); +} + +if (!heartbeatRequestState.canSendRequest(currentTimeMs)) { +return new NetworkClientDelegate.PollResult( +
[GitHub] [kafka] github-actions[bot] commented on pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer
github-actions[bot] commented on PR #13881: URL: https://github.com/apache/kafka/pull/13881#issuecomment-1726830843 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14370: KAFKA-15449: Verify transactional offset commits (KIP-890 part 1)
jolshan commented on code in PR #14370: URL: https://github.com/apache/kafka/pull/14370#discussion_r1330915798 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1048,7 +1048,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } +} +if (origin == AppendOrigin.CLIENT || origin == AppendOrigin.COORDINATOR) { // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. Review Comment: Good point. I will also update when David merges his change since it subtly affects this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #14370: KAFKA-15449: Verify transactional offset commits (KIP-890 part 1)
artemlivshits commented on code in PR #14370: URL: https://github.com/apache/kafka/pull/14370#discussion_r1330859068 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1048,7 +1048,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } +} +if (origin == AppendOrigin.CLIENT || origin == AppendOrigin.COORDINATOR) { // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. Review Comment: Comment should probably be updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726690138 @gkousouris Thanks for sharing your use-case. I think you are right to look towards MM2 for this sort of translation, and I think it's unfortunate that it isn't straightforward. The current offset translation doesn't "converge" for consumer groups which are inactive due to memory limitations, but for a single-shot migration use-case, that's not good enough. Are you able to stop the producers to the upstream topic, and let the consumers commit offsets at the end of the topic before performing the migration? If you set offset.lag.max very low, MM2 should be able to translate offsets at the end of the topic. > Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle. Yeah, if you want to get a 100% precise translation away from the end of the topic and don't want to modify MM2, you're going to need to "synchronize" the two topics and figure out which messages line up. Between offset.lag.max, the syncs topic throttle semaphore, and the OffsetSyncStore, a lot of intermediate offset syncs get discarded and the precision of the translation decreases significantly. If you let MirrorMaker2 perform a rough translation that you later refine with a custom script, you probably only need to compare a few hundred record checksums for each topic-partition-consumer-group. This would also allow you to compensate for the skipped offsets that EOS mode produces. I think you could make such a script reliable enough for a one-off migration, with some manual spot-checking to make sure it doesn't do anything too incorrect. If you're willing to hack on the MirrorMaker connectors, you could disable the throttling semaphore, the offset.lag.max parameter, and implement the full-depth OffsetSyncStore to get perfect translation. I don't think we could add those to mainline MM2 without a configuration, but you are certainly welcome to temporarily fork MM2 to get the job done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gkousouris commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726667976 Thanks a lot for your reply! I will look into creating a Jira account and creating a ticket for this. I should have mentioned that for we were planning on using Mirror Maker to migrate the topic a service reads from from cluster A to cluster B. So we would not be limited by the asynchronous offset translation that MirrorMaker uses, since we would be: 1. Mirroring data from old topic in cluster A to new topic in cluster B. 2. Stopping the service and waiting for the consumer offset of the last message on cluster A to be replicated on the new topic on cluster. 3. Restart the service to read from the new topic. We would have hoped that the offset would be translated exactly at some point, and would let us to seamlessly start consuming from the same point it was last stopped. MirrorMaker seems like a great use case for us, but this might be a bit of a blocker. Using the old offset translation version before this PR could perhaps work if we were to disable EOS (to get rid of the transactional messages). Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle. Would you perhaps recommend a different approach to not re-process a message twice ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726637742 Hi @gkousouris Thanks for asking! > Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from last_checkpoint_before_upstream_offset + 1 until downstream_offset. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ? Your understanding of the offset translation (post-KAFKA-12468) is correct, and I would expect re-processing of messages downstream after a fail-over. I also understand that this doesn't satisfy "exactly once semantics" for some definition, because it allows for re-delivery of the same message to the same "application", when that application uses multiple Kafka clusters. MirrorMaker2 currently provides "exactly once semantics" for replicating data, but not for offsets. I believe this is captured by the "MirrorSourceConnector" declaring that it supports EOS, but the"MirrorCheckpointConnector" does not. This means that when you replicate a topic with EOS mode, and use read_committed on the downstream topic from the beginning, EOS would mean that you read each record present in the upstream topic exactly once. When you instead start reading at the committed downstream offset, you may have records delivered downstream that have already been committed upstream. This is not just caused by the offset translation that this PR implements, it's a limitation of the asynchronous offset translation that MirrorMaker2 uses. Consider this sequence: 1. MirrorCheckpointTask syncs offsets 2. MirrorCheckpointTask sleeps 3. MirrorSourceTask replicates some records 4. The upstream consumer group consumes upstream records and commits offsets to the upstream group 5. The downstream consumer group starts reading the topic with the stale offsets in the downstream group Thanks for doing your due diligence on the claims of "exactly once semantics", and I hope that you can still make MirrorMaker2 work for your use-case. I suspect that EOS semantics across multiple Kafka clusters is a much larger effort than just changing the offset translation logic :) If you have a Jira account, please consider opening a ticket about this shortcoming. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gkousouris commented on PR #13178: URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726599437 Hi @gharris1727, if I understand this PR correctly, this will (almost always) cause duplication of data. Our problem is this: 1. We have a service reading from a Kafka topic and committing offsets for its consumer group. 2. We use MirrorMaker to replicate the topic to a different cluster. 3. We pause the service and check the current offset for the 2 streams (the one in the old cluster and the one in the new cluster). In step 3, these offsets will be different, specifically, the offset from the old cluster will be the last message the service managed to commit an offset for. And the new topic will have as an offset the value: `last_checkpoint_before_upstream_offset + 1`. Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from `last_checkpoint_before_upstream_offset + 1` until `downstream_offset`. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ? This behaviour was verified by looking at the output of the `kafka-consumer-groups` script. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #14409: Fix migration system tests for 3.6
mumrah opened a new pull request, #14409: URL: https://github.com/apache/kafka/pull/14409 As part of validating 3.6.0 RC0, I ran the ZK migration system tests at the RC tag. Pretty much all of them failed due to recent changes (particularly, disallowing migrations with JBOD). All of the changes here are test fixes, so not a release blocker. ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.3 session_id: 2023-09-19--007 run time: 8 minutes 51.147 seconds tests run:5 passed: 5 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False status: PASS run time: 2 minutes 56.029 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True status: PASS run time: 3 minutes 1.037 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_pre_migration_mode_3_4.metadata_quorum=ISOLATED_KRAFT status: PASS run time: 44.101 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_reconcile_kraft_to_zk status: PASS run time: 1 minute 25.509 seconds test_id: kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_upgrade_after_3_4_migration status: PASS run time: 43.993 seconds ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
lianetm commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1330551023 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -395,27 +386,28 @@ class PendingRequests { List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); -public boolean hasUnsentRequests() { +// Visible for teseting +boolean hasUnsentRequests() { return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty(); } -public CompletableFuture addOffsetCommitRequest(final Map offsets) { +CompletableFuture addOffsetCommitRequest(final Map offsets) { // TODO: Dedupe committing the same offsets to the same partitions OffsetCommitRequestState request = new OffsetCommitRequestState( offsets, groupState.groupId, groupState.groupInstanceId.orElse(null), groupState.generation); unsentOffsetCommits.add(request); -return request.future(); +return request.future; } /** - * Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future - * to the existing one. + * Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future + * to the existing one. * - * If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} - * upon completion. + * If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} + * upon completion. Review Comment: nit: wrong tag and other unclosed ones above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach
kamalcph commented on PR #14407: URL: https://github.com/apache/kafka/pull/14407#issuecomment-1726278190 > I had a comment to fix this/similar problem in original PR - [#13561 (comment)](https://github.com/apache/kafka/pull/13561#discussion_r1293305304) > > I am curious, why didn't the test which was added to resolve the comment fail? Is it because the test only checked for eligibility of a segment for calculation of size and didn't actually check if same segment is being counted twice? We don't have a test for the scenario mentioned in the [comment](https://github.com/apache/kafka/pull/13561#discussion_r1293305304). And, there was a regression after #14349. I have added couple of tests to verify the deleted segment count and log-start-offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on PR #14402: URL: https://github.com/apache/kafka/pull/14402#issuecomment-1726276699 There also seems to be a problem with the build. All the versions seem to be hanging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330542903 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -69,7 +69,6 @@ import com.yammer.metrics.core.Gauge import kafka.log.remote.RemoteLogManager import org.apache.kafka.common.config.AbstractConfig import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction} Review Comment: For our `setUpReplicaManagerWithMockedAddPartitionsToTxnManager` we can remove the lines of code that mocks the flow for getting transaction state partitions. val metadataResponseTopic = Seq(new MetadataResponseTopic() .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) .setPartitions(Seq( new MetadataResponsePartition() .setPartitionIndex(0) .setLeaderId(0)).asJava)) transactionalTopicPartitions.foreach(tp => when(metadataCache.contains(tp)).thenReturn(true)) when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node)) when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) I think the same code exists for `setupReplicaManagerWithMockedPurgatories` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach
divijvaidya commented on PR #14407: URL: https://github.com/apache/kafka/pull/14407#issuecomment-1726259952 I had a comment to fix this/similar problem in original PR - https://github.com/apache/kafka/pull/13561#discussion_r1293305304 I am curious, why didn't the test which was added to resolve the comment fail? Is it because the test only checked for eligibility of a segment for calculation of size and didn't actually check if same segment is being counted twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330524344 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -157,29 +216,114 @@ class AddPartitionsToTxnManagerTest { // The request for node1 should not be added because one request is already inflight. assertEquals(1, requestsAndHandlers2.size) requestsAndHandlers2.foreach { requestAndHandler => - verifyRequest(node2, transactionalId3, producerId3, requestAndHandler) + verifyRequest(node2, transactionalId3, producerId3, requestAndHandler, verifyOnly = false) } // Complete the request for node1 so the new one can go through. requestsAndHandlers.filter(_.destination == node1).head.handler.onComplete(authenticationErrorResponse) val requestsAndHandlers3 = addPartitionsToTxnManager.generateRequests().asScala assertEquals(1, requestsAndHandlers3.size) requestsAndHandlers3.foreach { requestAndHandler => - verifyRequest(node1, transactionalId2, producerId2, requestAndHandler) + verifyRequest(node1, transactionalId2, producerId2, requestAndHandler, verifyOnly = true) +} + } + + @Test + def testTransactionCoordinatorResolution(): Unit = { +when(partitionFor.apply(transactionalId1)).thenReturn(0) + +def checkError(): Unit = { + val errors = mutable.Map[TopicPartition, Errors]() + + addPartitionsToTxnManager.addTxnData( +transactionalId1, +producerId1, +producerEpoch = 0, +verifyOnly = true, +topicPartitions, +setErrors(errors) + ) + + assertEquals(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap, errors) } + +// The transaction state topic does not exist. + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq()) +checkError() + +// The metadata of the transaction state topic returns an error. + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( +new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code) + )) +checkError() + +// The partition does not exist. + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( +new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + )) +checkError() + +// The partition has not leader. Review Comment: nit: The partition has no leader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330523078 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest { private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) private val disconnectedResponse = clientResponse(null, disconnected = true) + private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")) + @BeforeEach def setup(): Unit = { addPartitionsToTxnManager = new AddPartitionsToTxnManager( - KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + config, networkClient, - time) + metadataCache, + partitionFor, + time +) } @AfterEach def teardown(): Unit = { addPartitionsToTxnManager.shutdown() } - def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { -callbackErrors.foreach { - case (tp, error) => errors.put(tp, error) -} + private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { +callbackErrors.forKeyValue(errors.put) } @Test def testAddTxnData(): Unit = { +when(partitionFor.apply(transactionalId1)).thenReturn(0) +when(partitionFor.apply(transactionalId2)).thenReturn(1) +when(partitionFor.apply(transactionalId3)).thenReturn(0) + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( +new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setPartitions(List( +new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(0) + .setLeaderId(0), +new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(1) + .setLeaderId(1) + ).asJava) + )) +when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)) + .thenReturn(Some(node0)) +when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)) + .thenReturn(Some(node1)) + val transaction1Errors = mutable.Map[TopicPartition, Errors]() val transaction2Errors = mutable.Map[TopicPartition, Errors]() val transaction3Errors = mutable.Map[TopicPartition, Errors]() -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1Errors)) -addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transaction2Errors)) -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId3, producerId3), setErrors(transaction3Errors)) +addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1Errors)) +addPartitionsToTxnManager.addTxnData(transactionalId2, producerId2, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction2Errors)) +addPartitionsToTxnManager.addTxnData(transactionalId3, producerId3, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction3Errors)) // We will try to add transaction1 3 more times (retries). One will have the same epoch, one will have a newer epoch, and one will have an older epoch than the new one we just added. val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithNewerEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, Errors]() // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response. -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1RetryWithSameEpochErrors)) +addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1RetryWithSameEpochErrors)) val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap assertEquals(expectedNetworkErrors, transaction1Errors) // Trying to add more transactional data for the same transactional ID and producer ID, but new epoch should replace the old data and send an error response for it. -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1, producerEpoch = 1), setErrors(transaction1RetryWithNewerEpochErrors)) +addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch =
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330522384 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -63,7 +73,42 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time val verificationFailureRate = metricsGroup.newMeter(VerificationFailureRateMetricName, "failures", TimeUnit.SECONDS) val verificationTimeMs = metricsGroup.newHistogram(VerificationTimeMsMetricName) - def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { + def addTxnData( +transactionalId: String, +producerId: Long, +producerEpoch: Short, +verifyOnly: Boolean, Review Comment: We will add this functionality later, but AddPartitionsToTxnManager currently only supports verifyOnly. In part 2, we will send AddPartitionsToTxnRequests that actually add the partition. I suppose this is ok since replicaManager hard codes true for now. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -63,7 +73,42 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time val verificationFailureRate = metricsGroup.newMeter(VerificationFailureRateMetricName, "failures", TimeUnit.SECONDS) val verificationTimeMs = metricsGroup.newHistogram(VerificationTimeMsMetricName) - def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { + def addTxnData( +transactionalId: String, +producerId: Long, +producerEpoch: Short, +verifyOnly: Boolean, Review Comment: We will add this functionality later, but AddPartitionsToTxnManager currently only supports verifyOnly=true. In part 2, we will send AddPartitionsToTxnRequests that actually add the partition. I suppose this is ok since replicaManager hard codes true for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330519350 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest { private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) private val disconnectedResponse = clientResponse(null, disconnected = true) + private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")) + @BeforeEach def setup(): Unit = { addPartitionsToTxnManager = new AddPartitionsToTxnManager( - KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + config, networkClient, - time) + metadataCache, + partitionFor, + time +) } @AfterEach def teardown(): Unit = { addPartitionsToTxnManager.shutdown() } - def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { -callbackErrors.foreach { - case (tp, error) => errors.put(tp, error) -} + private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { +callbackErrors.forKeyValue(errors.put) } @Test def testAddTxnData(): Unit = { +when(partitionFor.apply(transactionalId1)).thenReturn(0) Review Comment: I wonder if there is a way to make a helper where you pass in id + partition. But that might not be worth it for 2 tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager
jolshan commented on code in PR #14402: URL: https://github.com/apache/kafka/pull/14402#discussion_r1330516886 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest { private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) private val disconnectedResponse = clientResponse(null, disconnected = true) + private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")) + @BeforeEach def setup(): Unit = { addPartitionsToTxnManager = new AddPartitionsToTxnManager( - KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + config, networkClient, - time) + metadataCache, + partitionFor, + time +) } @AfterEach def teardown(): Unit = { addPartitionsToTxnManager.shutdown() } - def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { -callbackErrors.foreach { - case (tp, error) => errors.put(tp, error) -} + private def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { +callbackErrors.forKeyValue(errors.put) } @Test def testAddTxnData(): Unit = { +when(partitionFor.apply(transactionalId1)).thenReturn(0) +when(partitionFor.apply(transactionalId2)).thenReturn(1) +when(partitionFor.apply(transactionalId3)).thenReturn(0) + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)) + .thenReturn(Seq( +new MetadataResponseData.MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setPartitions(List( +new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(0) + .setLeaderId(0), +new MetadataResponseData.MetadataResponsePartition() + .setPartitionIndex(1) + .setLeaderId(1) + ).asJava) + )) +when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)) + .thenReturn(Some(node0)) +when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)) + .thenReturn(Some(node1)) + val transaction1Errors = mutable.Map[TopicPartition, Errors]() val transaction2Errors = mutable.Map[TopicPartition, Errors]() val transaction3Errors = mutable.Map[TopicPartition, Errors]() -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1Errors)) -addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transaction2Errors)) -addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId3, producerId3), setErrors(transaction3Errors)) +addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, producerEpoch = 0, verifyOnly = true, topicPartitions, setErrors(transaction1Errors)) Review Comment: Did we specify the param name here for readability? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach
kamalcph commented on code in PR #14407: URL: https://github.com/apache/kafka/pull/14407#discussion_r1330492380 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -993,7 +988,9 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex return; } RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - +if (segmentsToDelete.contains(metadata)) { Review Comment: This is the main fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown
nizhikov commented on PR #13929: URL: https://github.com/apache/kafka/pull/13929#issuecomment-1726203159 Hello. I will review this PR in the nearest time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongnuo123 opened a new pull request, #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API
dongnuo123 opened a new pull request, #14408: URL: https://github.com/apache/kafka/pull/14408 This pr implements DeleteGroups and OffsetDelete API according to the implementation in the old GroupCoordinator but with the new model. ### Jira https://issues.apache.org/jira/browse/KAFKA-14506 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph opened a new pull request, #14407: KAFKA-15479: Remote log segments should be considered once for retention breach
kamalcph opened a new pull request, #14407: URL: https://github.com/apache/kafka/pull/14407 When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments lesser than expected. This is a follow-up of KAFKA-15352 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15479: - Summary: Remote log segments should be considered once for retention breach (was: Remote log segments should be considered once for retention size breach) > Remote log segments should be considered once for retention breach > -- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > When a remote log segment contains multiple epoch, then it gets considered > for multiple times during breach by retention size/time/start-offset. This > will affect the deletion by remote log retention size as it deletes the > number of segments lesser than expected. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330480862 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatInterval = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +pr
[GitHub] [kafka] kirktrue closed pull request #14397: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue closed pull request #14397: KAFKA-14274 [6, 7]: Introduction of fetch request manager URL: https://github.com/apache/kafka/pull/14397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue opened a new pull request, #14406: URL: https://github.com/apache/kafka/pull/14406 Changes: 1. Introduces `FetchRequestManager` that implements the `RequestManager` API for fetching messages from brokers. Unlike `Fetcher`, record decompression and deserialization is performed on the application thread inside `CompletedFetch`. 2. Restructured the code so that objects owned by the background thread are not instantiated until the background thread runs (via `Supplier`) to ensure that there are no references available to the application thread. 3. Ensuring resources are properly using `Closeable` and using `IdempotentCloser` to ensure they're only closed once. 4. Introduces `ConsumerTestBuilder` to reduce a lot of inconsistency in the way the objects were built up for tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330475695 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, heartbeatIntervalMs, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.time = time; +this.logger = logContext.logger(this.getClass()); +this.subscriptions = subscriptions; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +this.coordinatorRequestManager = coordinatorRequestManager; +this.heartbeatRequestState = heartbeatRequestState; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +} + +@Override +public NetworkClientDelegate.PollResult poll(long currentTimeMs) { +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.notInGroup()) { Review Comment: As previously commented, maybe let's use `shouldHeartbeat` to be more explicit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment
[GitHub] [kafka] ahuang98 opened a new pull request, #14405: [MINOR] QuorumController tests use testToImage
ahuang98 opened a new pull request, #14405: URL: https://github.com/apache/kafka/pull/14405 Tests that records generated from existing QuorumController tests will generate the same final image regardless of how they are batched in replay. Builds on top of https://github.com/apache/kafka/pull/13724 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics
[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17766887#comment-17766887 ] Chris Egerton commented on KAFKA-10339: --- There is no MirrorSinkConnector; we never implemented that. > MirrorMaker2 Exactly-once Semantics > --- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention size breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15479: - Description: When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments lesser than expected. This is a follow-up of KAFKA-15352 (was: When a remote log segment contains multiple epoch, then it considered for multiple times during deletion. This will affect the deletion by remote log retention size. This is a follow-up of KAFKA-15352) > Remote log segments should be considered once for retention size breach > --- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > When a remote log segment contains multiple epoch, then it gets considered > for multiple times during breach by retention size/time/start-offset. This > will affect the deletion by remote log retention size as it deletes the > number of segments lesser than expected. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager
junrao commented on code in PR #14386: URL: https://github.com/apache/kafka/pull/14386#discussion_r1330455761 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + * + * + * listTopics + * partitionsFor + * + * + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + * + */ + +public class TopicMetadataRequestManager implements RequestManager { +private final boolean allowAutoTopicCreation; +private final Map, TopicMetadataRequestState> inflightRequests; +private final long retryBackoffMs; +private final long retryBackoffMaxMs; +private final Logger log; +private final LogContext logContext; + +public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { +logContext = context; +log = logContext.logger(this.getClass()); Review Comment: Could we remove `this`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -91,7 +90,7 @@ public CommitRequestManager( } /** - * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} request if there's any. The function will + * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequestState} request if there's any. The function will Review Comment: Hmm, OffsetCommitRequestState should be OffsetCommitRequest, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org
[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once during retention size breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15479: - Issue Type: Task (was: Improvement) > Remote log segments should be considered once during retention size breach > -- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > When a remote log segment contains multiple epoch, then it considered for > multiple times during deletion. This will affect the deletion by remote log > retention size. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention size breach
[ https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15479: - Summary: Remote log segments should be considered once for retention size breach (was: Remote log segments should be considered once during retention size breach) > Remote log segments should be considered once for retention size breach > --- > > Key: KAFKA-15479 > URL: https://issues.apache.org/jira/browse/KAFKA-15479 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > When a remote log segment contains multiple epoch, then it considered for > multiple times during deletion. This will affect the deletion by remote log > retention size. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330460386 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.time = time; +this.logger = logContext.logger(HeartbeatRequestManager.class); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + +long heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); Review Comment: thanks, just note it here: `group.consumer.heartbeat.interval.ms is defined on the server side and the member is told about it in the heartbeat response.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15479) Remote log segments should be considered once during retention size breach
Kamal Chandraprakash created KAFKA-15479: Summary: Remote log segments should be considered once during retention size breach Key: KAFKA-15479 URL: https://issues.apache.org/jira/browse/KAFKA-15479 Project: Kafka Issue Type: Improvement Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash When a remote log segment contains multiple epoch, then it considered for multiple times during deletion. This will affect the deletion by remote log retention size. This is a follow-up of KAFKA-15352 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330458175 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +public class HeartbeatRequestManager implements RequestManager { +private final Time time; Review Comment: Will do. Sorry for completely missing this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1330451339 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -79,6 +80,9 @@ public int hashCode() { @Override public String toString() { -return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor); +return "AssignorSelection{" + Review Comment: I see - I think we've been consistently using { in the refactor. Maybe we should change that @kirktrue @lianetm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin
[ https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-15478: -- Description: Connect uses AdminClients to create topics; while this simplifies the implementation of Connect it has the following problems * It assumes that whoever runs Connect must have admin access to both source and destination clusters. This assumption is not necessarily valid all the time. * It creates conflict in use-cases where centralised systems or tools manage Kafka resources. It would be easier if customers could provide how they want to manage Kafka topics through admin client or using their centralised system or tools. We already have ForwardingAdmin in MM2 so we can extend connect to do something similiar KIP-981 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin] was: Connect uses AdminClients to create topics; while this simplifies the implementation of Connect it has the following problems * It assumes that whoever runs Connect must have admin access to both source and destination clusters. This assumption is not necessarily valid all the time. * It creates conflict in use-cases where centralised systems or tools manage Kafka resources. It would be easier if customers could provide how they want to manage Kafka topics through admin client or using their centralised system or tools. We already have ForwardingAdmin in MM2 so we can extend connect to do something similiar > Update connect to use ForwardingAdmin > - > > Key: KAFKA-15478 > URL: https://issues.apache.org/jira/browse/KAFKA-15478 > Project: Kafka > Issue Type: New Feature >Reporter: Omnia Ibrahim >Priority: Major > Labels: need-kip > > Connect uses AdminClients to create topics; while this simplifies the > implementation of Connect it has the following problems > * It assumes that whoever runs Connect must have admin access to both source > and destination clusters. This assumption is not necessarily valid all the > time. > * It creates conflict in use-cases where centralised systems or tools manage > Kafka resources. > It would be easier if customers could provide how they want to manage Kafka > topics through admin client or using their centralised system or tools. > > We already have ForwardingAdmin in MM2 so we can extend connect to do > something similiar > KIP-981 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin
[ https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-15478: -- Labels: need-kip (was: ) > Update connect to use ForwardingAdmin > - > > Key: KAFKA-15478 > URL: https://issues.apache.org/jira/browse/KAFKA-15478 > Project: Kafka > Issue Type: New Feature >Reporter: Omnia Ibrahim >Priority: Major > Labels: need-kip > > Connect uses AdminClients to create topics; while this simplifies the > implementation of Connect it has the following problems > * It assumes that whoever runs Connect must have admin access to both source > and destination clusters. This assumption is not necessarily valid all the > time. > * It creates conflict in use-cases where centralised systems or tools manage > Kafka resources. > It would be easier if customers could provide how they want to manage Kafka > topics through admin client or using their centralised system or tools. > > We already have ForwardingAdmin in MM2 so we can extend connect to do > something similiar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15313) Delete remote log segments partition asynchronously when a partition is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15313: - Description: KIP-405 already covers the approach to delete remote log segments asynchronously through controller and RLMM layers. reference: https://github.com/apache/kafka/pull/13947#discussion_r1281675818 was:KIP-405 already covers the approach to delete remote log segments asynchronously through controller and RLMM layers. > Delete remote log segments partition asynchronously when a partition is > deleted. > - > > Key: KAFKA-15313 > URL: https://issues.apache.org/jira/browse/KAFKA-15313 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > Labels: KIP-405 > > KIP-405 already covers the approach to delete remote log segments > asynchronously through controller and RLMM layers. > reference: https://github.com/apache/kafka/pull/13947#discussion_r1281675818 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin
[ https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-15478: -- Description: Connect uses AdminClients to create topics; while this simplifies the implementation of Connect it has the following problems * It assumes that whoever runs Connect must have admin access to both source and destination clusters. This assumption is not necessarily valid all the time. * It creates conflict in use-cases where centralised systems or tools manage Kafka resources. It would be easier if customers could provide how they want to manage Kafka topics through admin client or using their centralised system or tools. We already have ForwardingAdmin in MM2 so we can extend connect to do something similiar was: Connect uses AdminClients to create topics; while this simplifies the implementation of Connect it has the following problems * It assumes that whoever runs Connect must have admin access to both source and destination clusters. This assumption is not necessarily valid all the time. * It creates conflict in use-cases where centralised systems or tools manage Kafka resources. It would be easier if customers could provide how they want to manage Kafka topics through admin client or using their centralised system or tools. We already have > Update connect to use ForwardingAdmin > - > > Key: KAFKA-15478 > URL: https://issues.apache.org/jira/browse/KAFKA-15478 > Project: Kafka > Issue Type: New Feature >Reporter: Omnia Ibrahim >Priority: Major > > Connect uses AdminClients to create topics; while this simplifies the > implementation of Connect it has the following problems > * It assumes that whoever runs Connect must have admin access to both source > and destination clusters. This assumption is not necessarily valid all the > time. > * It creates conflict in use-cases where centralised systems or tools manage > Kafka resources. > It would be easier if customers could provide how they want to manage Kafka > topics through admin client or using their centralised system or tools. > > We already have ForwardingAdmin in MM2 so we can extend connect to do > something similiar -- This message was sent by Atlassian Jira (v8.20.10#820010)