[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-09-19 Thread Nelson B. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson B. updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # {color:#00875a}AbstractStreamTest{color} (owner: Christo)
 # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KTableImplTest{color} (owner: Christo)
 # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}RecordCollectorTest{color} (owner: Christo)
 # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo)
 # 

[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-09-19 Thread Nelson B. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson B. updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 
 # {color:#00875a}AbstractStreamTest{color} (owner: Christo)
 # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KTableImplTest{color} (owner: Christo)
 # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo)
 # {color:#00875a}ActiveTaskCreatorTest{color} (owner: Christo)
 # {color:#00875a}ChangelogTopicsTest{color} (owner: Christo)
 # {color:#00875a}GlobalProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}RecordCollectorTest{color} (owner: Christo)
 # {color:#00875a}StateRestoreCallbackAdapterTest{color} (owner: Christo)
 # 

[GitHub] [kafka] bachmanity1 commented on pull request #14410: KAFKA-14133: Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest

2023-09-19 Thread via GitHub


bachmanity1 commented on PR #14410:
URL: https://github.com/apache/kafka/pull/14410#issuecomment-1727005007

   @clolov @yashmayya can you please have a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 opened a new pull request, #14410: KAFKA-14133: Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & GlobalStateStoreProviderTest

2023-09-19 Thread via GitHub


bachmanity1 opened a new pull request, #14410:
URL: https://github.com/apache/kafka/pull/14410

   Replace Easymock with Mockito in StreamsProducerTest, TopologyMetadataTest & 
GlobalStateStoreProviderTest 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8391:
--
Fix Version/s: 2.4.0

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13875) update docs to include topoicId for kafka-topics.sh --describe output

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13875:
---
Fix Version/s: 3.6.0

> update docs to include topoicId for kafka-topics.sh --describe output
> -
>
> Key: KAFKA-13875
> URL: https://issues.apache.org/jira/browse/KAFKA-13875
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Richard Joerger
>Priority: Major
>  Labels: newbie
> Fix For: 3.6.0
>
>
> The topic describe output in quickstart doc here: 
> [https://kafka.apache.org/quickstart] should be updated now.
> {code:java}
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 
> localhost:9092
> Topic:quickstart-events  PartitionCount:1ReplicationFactor:1 Configs:
> Topic: quickstart-events Partition: 0Leader: 0   Replicas: 0 Isr: 
> 0{code}
> After Topic Id implementation, we included the topic id info in the output 
> now. Also the configs is not empty now. The doc should be updated to avoid 
> new users get confused.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15286) Migrate ApiVersion related code to kraft

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15286:
---
Fix Version/s: 3.6.0

> Migrate ApiVersion related code to kraft
> 
>
> Key: KAFKA-15286
> URL: https://issues.apache.org/jira/browse/KAFKA-15286
> Project: Kafka
>  Issue Type: Task
>Reporter: Deng Ziming
>Assignee: Deng Ziming
>Priority: Major
> Fix For: 3.6.0
>
>
> In many places involving ApiVersion, we only support zk, we should move it 
> forward to kraft.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15424) Make verification a dynamic configuration

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15424:
---
Fix Version/s: 3.5.0

> Make verification a dynamic configuration
> -
>
> Key: KAFKA-15424
> URL: https://issues.apache.org/jira/browse/KAFKA-15424
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.5.0
>
>
> It would be nice if we can dynamically disable the verification. This can 
> prevent disruptive actions like a roll if the feature is causing issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-10339:
---
Fix Version/s: 3.5.0

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15017:
---
Fix Version/s: 3.5.0

> New ClientQuotas are not written to ZK from snapshot 
> -
>
> Key: KAFKA-15017
> URL: https://issues.apache.org/jira/browse/KAFKA-15017
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: David Arthur
>Assignee: Proven Provenzano
>Priority: Critical
> Fix For: 3.5.0
>
>
> Similar issue to KAFKA-15009



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14291) KRaft: ApiVersionsResponse doesn't have finalizedFeatures and finalizedFeatureEpoch in KRaft mode

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14291:
---
Fix Version/s: 3.5.0

> KRaft: ApiVersionsResponse doesn't have finalizedFeatures and 
> finalizedFeatureEpoch in KRaft mode
> -
>
> Key: KAFKA-14291
> URL: https://issues.apache.org/jira/browse/KAFKA-14291
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Assignee: Deng Ziming
>Priority: Critical
> Fix For: 3.5.0
>
>
> https://github.com/apache/kafka/blob/d834947ae7abc8a9421d741e742200bb36f51fb3/core/src/main/scala/kafka/server/ApiVersionManager.scala#L53
> ```
> class SimpleApiVersionManager(
>   val listenerType: ListenerType,
>   val enabledApis: collection.Set[ApiKeys],
>   brokerFeatures: Features[SupportedVersionRange]
> ) extends ApiVersionManager {
>   def this(listenerType: ListenerType) = {
> this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
> BrokerFeatures.defaultSupportedFeatures())
>   }
>   private val apiVersions = 
> ApiVersionsResponse.collectApis(enabledApis.asJava)
>   override def apiVersionResponse(requestThrottleMs: Int): 
> ApiVersionsResponse = {
> ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
> apiVersions, brokerFeatures)
>   }
> }
> ```
> ApiVersionManager for KRaft doesn't add the finalizedFeatures and 
> finalizedFeatureEpoch to the ApiVersionsResponse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-13295:
---
Fix Version/s: 3.4.0

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.4.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-12283:
---
Fix Version/s: 3.5.0

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-4327:
--
Fix Version/s: 3.5.0

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: kip
> Fix For: 3.5.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14931) Revert KAFKA-14561 in 3.5

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14931:
---
Fix Version/s: 3.5.0

> Revert KAFKA-14561 in 3.5
> -
>
> Key: KAFKA-14931
> URL: https://issues.apache.org/jira/browse/KAFKA-14931
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.5.0
>
>
> We have too many blockers for this commit to work well, so in the interest of 
> code quality, we should revert 
> https://issues.apache.org/jira/browse/KAFKA-14561 in 3.5 and fix the issues 
> for 3.6



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14904) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupId()

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14904:
---
Fix Version/s: 3.5.0

> Flaky Test  kafka.api.TransactionsBounceTest.testWithGroupId()
> --
>
> Key: KAFKA-14904
> URL: https://issues.apache.org/jira/browse/KAFKA-14904
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.5.0
>
>
> After merging KAFKA-14561 I noticed this test still occasionally failed via 
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6ms while awaiting EndTxn(true)
> I will investigate the cause. 
> Note: This error occurs when we are waiting for the transaction to be 
> committed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14916:
---
Fix Version/s: 3.5.0

> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.5.0
>
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Further, KAFKA-14561 will not assume all records are transactional.
> Originally this ticket had an action item to ensure all the producer IDs are 
> the same in the batches since we send a single txn ID, but that can be done 
> in a followup KAFKA-14958, as we still need to assess if we can enforce this 
> without breaking workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14920:
---
Fix Version/s: 3.6.0

> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> Since we currently don't validate the first sequence (we will in part 2 of 
> kip-890), any transient error on the first produce can lead to out of order 
> sequences that never recover.
> Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
> transient issues, but until that is fixed, we may need to retry from in the 
> AddPartitionsManager instead. We addressed the concurrent transactions, but 
> there are other errors like coordinator loading that we could run into and 
> see increased out of order issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14970) Dual write mode testing for SCRAM and Quota

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14970:
---
Fix Version/s: 3.5.0

> Dual write mode testing for SCRAM and Quota
> ---
>
> Key: KAFKA-14970
> URL: https://issues.apache.org/jira/browse/KAFKA-14970
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Blocker
>  Labels: 3.5
> Fix For: 3.5.0
>
>
> SCRAM and Quota are stored together in ZK and we need better testing to 
> validate the dual write mode support for them.
> I will add some additional tests for this.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14884) Include check transaction is still ongoing right before append

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14884:
---
Fix Version/s: 3.6.0

> Include check transaction is still ongoing right before append 
> ---
>
> Key: KAFKA-14884
> URL: https://issues.apache.org/jira/browse/KAFKA-14884
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Even after checking via AddPartitionsToTxn, the transaction could be aborted 
> after the response. We can add one more check before appending.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15380) Try complete actions after callback

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15380:
---
Fix Version/s: 3.6.0

> Try complete actions after callback
> ---
>
> Key: KAFKA-15380
> URL: https://issues.apache.org/jira/browse/KAFKA-15380
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> KIP-890 part 1 introduced the callback request type. It is used to execute a 
> callback after KafkaApis.handle has returned. We did not account for 
> tryCompleteActions at the end of handle when making this change.
> In tests, we saw produce p99 increase dramatically (likely because we have to 
> wait for another request before we can complete DelayedProduce). As a result, 
> we should add the tryCompleteActions after the callback as well. In testing, 
> this improved the produce performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15315) Use getOrDefault rather than get

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15315:
---
Fix Version/s: 3.7.0

> Use getOrDefault rather than get
> 
>
> Key: KAFKA-15315
> URL: https://issues.apache.org/jira/browse/KAFKA-15315
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: roon
>Priority: Trivial
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14900:
---
Fix Version/s: 3.5.0

> Flaky test AuthorizerTest failing with NPE
> --
>
> Key: KAFKA-14900
> URL: https://issues.apache.org/jira/browse/KAFKA-14900
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Reporter: Greg Harris
>Assignee: Colin McCabe
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> The AuthorizerTest has multiple tests that appear to have the same flaky 
> failure:
> {noformat}
> org.apache.kafka.server.fault.FaultHandlerException: 
> quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
> Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
> of "kafka.server.SharedServer.raftManager()" is null
>   at 
> app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "kafka.raft.KafkaRaftManager.client()" because the return value of 
> "kafka.server.SharedServer.raftManager()" is null
>   ... 8 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14790) Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14790:
---
Fix Version/s: 3.5.0

> Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
> ---
>
> Key: KAFKA-14790
> URL: https://issues.apache.org/jira/browse/KAFKA-14790
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Minor
> Fix For: 3.5.0
>
>
> Followup from [https://github.com/apache/kafka/pull/13231]
> We should add authorizer tests for the new version.
> We should add some more tests to KafkaApis to cover auth and validation 
> failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14791) Create a builder class for PartitionRegistration

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14791:
---
Fix Version/s: 3.6.0

> Create a builder class for PartitionRegistration
> 
>
> Key: KAFKA-14791
> URL: https://issues.apache.org/jira/browse/KAFKA-14791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14913:
---
Fix Version/s: 3.5.0

> Migrate DistributedHerder Executor shutdown to use 
> ThreadUtils#shutdownExecutorServiceQuietly
> -
>
> Key: KAFKA-14913
> URL: https://issues.apache.org/jira/browse/KAFKA-14913
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Minor
> Fix For: 3.5.0
>
>
> Some context here: 
> https://github.com/apache/kafka/pull/13557#issuecomment-1509738740



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15148) Some integration tests are running as unit tests

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15148:
---
Fix Version/s: 3.6.0

> Some integration tests are running as unit tests
> 
>
> Key: KAFKA-15148
> URL: https://issues.apache.org/jira/browse/KAFKA-15148
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Ezio Xie
>Priority: Minor
>  Labels: newbie
> Fix For: 3.6.0
>
>
> *This is a good item for a newcomer into Kafka code base to pick up*
>  
> When we run `./gradlew unitTest`, it is supposed to run all unit tests. 
> However, we are running some integration tests as part of which makes the 
> overall process of running unitTest take longer than expected.
> Example of such tests:
> > :streams:unitTest > Executing test 
> > org.apache...integration.NamedTopologyIntegrationTest
> > :streams:unitTest > Executing test 
> > org.apache...integration.QueryableStateIntegrationTest
> After this task, we should not run the these tests as part of `./gradlew 
> unitTest`, instead they should be run as part of `./gradlew integrationTest`.
> As part of acceptance criteria, please add the snapshot of html summary 
> generated to verify that these tests are indeed running as part of 
> integrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14898:
---
Fix Version/s: 3.5.0

> [ MirrorMaker ] sync.topic.configs.enabled not working as expected
> --
>
> Key: KAFKA-14898
> URL: https://issues.apache.org/jira/browse/KAFKA-14898
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0
>Reporter: Srinivas Boga
>Priority: Major
>  Labels: mirrormaker
> Fix For: 3.5.0
>
>
> Hello,
> In my replication set up , i do not want to sync the topic configs, the use 
> case is to have different retention time for the topic on the target cluster, 
> I am passing the config
> {code:java}
>  sync.topic.configs.enabled = false{code}
> but this is not working as expected the topic retention time is being set to 
> whatever is being set in the source cluster, looking at the mirrormaker logs 
> i can see that MirrorSourceConnector is still setting the above config as true
> {code:java}
> [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig 
> values:
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.include.jmx.reporter = true
>         auto.offset.reset = earliest
>         bootstrap.servers = [sourcecluster.com:9092]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-null-2
>         client.rack =
>         connections.max.idle.ms = 54
>         default.api.timeout.ms = 6
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = null
>         group.instance.id = null
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_uncommitted
>         key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 30
>         max.poll.records = 500
>         metadata.max.age.ms = 30
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 3
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 3
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 6
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 1
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 360
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = PLAINTEXT
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 45000
>         socket.connection.setup.timeout.max.ms = 3
>         socket.connection.setup.timeout.ms = 1
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         

[jira] [Updated] (KAFKA-14925) The website shouldn't load external resources

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14925:
---
Fix Version/s: 3.5.0

> The website shouldn't load external resources
> -
>
> Key: KAFKA-14925
> URL: https://issues.apache.org/jira/browse/KAFKA-14925
> Project: Kafka
>  Issue Type: Improvement
>  Components: website
>Reporter: Mickael Maison
>Assignee: Atul Sharma
>Priority: Major
> Fix For: 3.5.0
>
>
> In includes/_header.htm, we load a resource from fontawesome.com



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14859) Support SCRAM ZK to KRaft Migration

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14859:
---
Fix Version/s: 3.6.0

> Support SCRAM ZK to KRaft Migration
> ---
>
> Key: KAFKA-14859
> URL: https://issues.apache.org/jira/browse/KAFKA-14859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.6.0
>
>
> I want to allow existing ZK installations to be able to migrate to KRaft and 
> support their existing SCRAM credentials.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14859) Support SCRAM ZK to KRaft Migration

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14859:
---
Fix Version/s: 3.5.0
   (was: 3.6.0)

> Support SCRAM ZK to KRaft Migration
> ---
>
> Key: KAFKA-14859
> URL: https://issues.apache.org/jira/browse/KAFKA-14859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.5.0
>
>
> I want to allow existing ZK installations to be able to migrate to KRaft and 
> support their existing SCRAM credentials.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14857) Fix some MetadataLoader bugs

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14857:
---
Fix Version/s: 3.5.0

> Fix some MetadataLoader bugs
> 
>
> Key: KAFKA-14857
> URL: https://issues.apache.org/jira/browse/KAFKA-14857
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-7497:
--
Fix Version/s: 3.4.0

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.4.0
>
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14835) Create ControllerServerMetricsPublisher

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14835:
---
Fix Version/s: 3.5.0

> Create ControllerServerMetricsPublisher
> ---
>
> Key: KAFKA-14835
> URL: https://issues.apache.org/jira/browse/KAFKA-14835
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14961) DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14961:
---
Fix Version/s: 3.5.0

> DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky
> -
>
> Key: KAFKA-14961
> URL: https://issues.apache.org/jira/browse/KAFKA-14961
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
> Fix For: 3.5.0
>
>
> When running the test suite locally I noticed the following error
> {code:java}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
>   at 
> app//org.apache.kafka.clients.consumer.internals.DefaultBackgroundThreadTest.testStartupAndTearDown(DefaultBackgroundThreadTest.java:95)
>  {code}
> which happened only once and I could reproduce it again. 
> I further noticed some NPE in debug logs in the form of
> {code:java}
>  ERROR The background thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread:166)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.handlePollResult(DefaultBackgroundThread.java:200)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>     at 
> java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:553)
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.runOnce(DefaultBackgroundThread.java:187)
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.run(DefaultBackgroundThread.java:159)
>  {code}
> which is due to missing stubs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14658) When listening on fixed ports, defer port opening until we're ready

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14658:
---
Fix Version/s: 3.5.0

> When listening on fixed ports, defer port opening until we're ready
> ---
>
> Key: KAFKA-14658
> URL: https://issues.apache.org/jira/browse/KAFKA-14658
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.5.0
>
>
> When we are listening on fixed ports, we should defer opening ports until 
> we're ready to accept traffic. If we open the broker port too early, it can 
> confuse monitoring and deployment systems. This is a particular concern when 
> in KRaft mode, since in that mode, we create the SocketServer object earlier 
> in the startup process than when in ZK mode.
> The approach taken in this PR is to defer opening the acceptor port until 
> Acceptor.start is called. Note that when we are listening on a random port, 
> we continue to open the port "early," in the SocketServer constructor. The 
> reason for doing this is that there is no other way to find the random port 
> number the kernel has selected. Since random port assignment is not used in 
> production deployments, this should be reasonable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14357:
URL: https://github.com/apache/kafka/pull/14357#discussion_r1330957545


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberAssignmentReconciler.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RebalanceCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RevokePartitionsEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * {@code MemberAssignmentReconciler} works with {@link MembershipManager} to 
a) determine, and b) modify the
+ * current set of assigned {@link TopicPartition partitions} via {@link 
SubscriptionState}. Reconciliation is a
+ * two-part process, the first being a revocation of partitions, followed by 
assignment of partitions. Each of the two
+ * steps may result in one of the following:
+ *
+ * 
+ * 
+ * {@link ReconciliationResult#NO_CHANGE}: no changes were made to the 
set of partitions.
+ * 
+ * 
+ * {@link ReconciliationResult#IN_PROGRESS}: changes to the assignment 
have started. In practice this means
+ * that the appropriate {@link ConsumerRebalanceListener} callback 
method is being invoked.
+ * 
+ * 
+ * {@link ReconciliationResult#COMPLETED}: the {@link 
ConsumerRebalanceListener} callback method was made and
+ * the changes were applied locally.
+ * 
+ * 
+ * {@link ReconciliationResult#EXPIRED}: something happened to cause 
the operation to modify the assigned set
+ * of partitions. This could be caused by a {@link 
ConsumerRebalanceListener} callback method that takes too
+ * long to execute, interruption with the consumer group coordinator, 
or other reasons.
+ * 
+ * 
+ *
+ * The comparison against the {@link SubscriptionState#assignedPartitions() 
current set of assigned partitions} and
+ * the {@link Assignment#assignedTopicPartitions() target set of assigned 
partitions} is performed by essentially
+ * flattening the respective entries into two sets of {@link 
org.apache.kafka.common.TopicPartition partitons}
+ * which are then compared using basic {@link Set} comparisons.
+ */
+public class MemberAssignmentReconciler {
+
+/**
+ * The result of the {@link #revoke(Optional, Timer)} or {@link 
#assign(Optional, Timer)} methods being invoked.
+ */
+enum ReconciliationResult {
+NO_CHANGE,
+IN_PROGRESS,
+COMPLETED,
+EXPIRED
+}
+
+// Ugly little handler enum for making logging less verbose.
+private enum Operation {
+
+REVOKE("revoke", "revoked"), ASSIGN("assign", "assigned");
+
+private final String verbPastTense;
+
+private final String methodName;
+
+Operation(String verbPresentTense, String verbPastTense) {
+this.verbPastTense = verbPastTense;
+this.methodName = String.format("%s.onPartitions%s()", 
ConsumerRebalanceListener.class.getSimpleName(), verbPresentTense.substring(0, 
1).toUpperCase(Locale.ROOT) + 

[jira] [Updated] (KAFKA-15036) Kraft leader change fails when invoking getFinalizedFeatures

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15036:
---
Fix Version/s: 3.6.0

> Kraft leader change fails when invoking getFinalizedFeatures
> 
>
> Key: KAFKA-15036
> URL: https://issues.apache.org/jira/browse/KAFKA-15036
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Deng Ziming
>Assignee: Deng Ziming
>Priority: Major
> Fix For: 3.6.0
>
>
> When kraft leader changes, we can receiving a error as follows:
>  
> {{[2023-05-24 18:00:02,898] WARN [QuorumController id=3002] 
> getFinalizedFeatures: failed with unknown server exception RuntimeException 
> in 271 us.  The controller is already in standby mode. 
> (org.apache.kafka.controller.QuorumController)
> java.lang.RuntimeException: No in-memory snapshot for epoch 9. Snapshot 
> epochs are: 
>   at 
> org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173)
>   at 
> org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131)
>   at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69)
>   at 
> org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303)
>   at 
> org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016)
>   at 
> org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:829)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14956:
---
Fix Version/s: 3.5.0

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> 

[jira] [Updated] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14278:
---
Fix Version/s: 3.6.0

> Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
> ---
>
> Key: KAFKA-14278
> URL: https://issues.apache.org/jira/browse/KAFKA-14278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer , streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15028:
---
Fix Version/s: 3.6.0

> AddPartitionsToTxnManager metrics
> -
>
> Key: KAFKA-15028
> URL: https://issues.apache.org/jira/browse/KAFKA-15028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: latency-cpu.html
>
>
> KIP-890 added metrics for the AddPartitionsToTxnManager
> VerificationTimeMs – number of milliseconds from adding partition info to the 
> manager to the time the response is sent. This will include the round trip to 
> the transaction coordinator if it is called. This will also account for 
> verifications that fail before the coordinator is called.
> VerificationFailureRate – rate of verifications that returned in failure 
> either from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14518) Rebalance on topic/partition metadata changes

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14518:
---
Fix Version/s: 3.6.0

> Rebalance on topic/partition metadata changes
> -
>
> Key: KAFKA-14518
> URL: https://issues.apache.org/jira/browse/KAFKA-14518
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15131) Improve RemoteStorageManager exception handling documentation

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15131:
---
Fix Version/s: 3.6.0

> Improve RemoteStorageManager exception handling documentation
> -
>
> Key: KAFKA-15131
> URL: https://issues.apache.org/jira/browse/KAFKA-15131
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.6.0
>
>
> As discussed here[1], RemoteStorageManager javadocs requires clarification 
> regarding error handling:
>  * Remove ambiguity on `RemoteResourceNotFoundException` description
>  * Describe when `RemoteResourceNotFoundException` can/should be thrown
>  * Describe expectations for idempotent operations when copying/deleting
>  
> [1] 
> https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14848) KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14848:
---
Fix Version/s: 3.5.0

> KafkaConsumer incorrectly passes locally-scoped deserializers to FetchConfig
> 
>
> Key: KAFKA-14848
> URL: https://issues.apache.org/jira/browse/KAFKA-14848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> [~rayokota] found some {{{}NullPointerException{}}}s that originate because 
> of a recently introduced error in the {{KafkaConsumer}} constructor. The code 
> was changed to pass the deserializer variables into the {{FetchConfig}} 
> constructor. However, this code change incorrectly used the locally-scoped 
> variables, not the instance-scoped variables. Since the locally-scoped 
> variables could be {{{}null{}}}, this results in the {{FetchConfig}} storing 
> {{null}} references, leading to downstream breakage.
> Suggested change:
> {noformat}
> - FetchConfig fetchConfig = new FetchConfig<>(config, keyDeserializer, 
> valueDeserializer, isolationLevel);
> + FetchConfig fetchConfig = new FetchConfig<>(config, 
> this.keyDeserializer, this.valueDeserializer, isolationLevel);
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-12525:
---
Fix Version/s: 3.6.0

> Inaccurate task status due to status record interleaving in fast rebalances 
> in Connect
> --
>
> Key: KAFKA-12525
> URL: https://issues.apache.org/jira/browse/KAFKA-12525
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Konstantine Karantasis
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.6.0
>
>
> When a task is stopped in Connect it produces an {{UNASSIGNED}} status 
> record. 
> Equivalently, when a task is started or restarted in Connect it produces an 
> {{RUNNING}} status record in the Connect status topic.
> At the same time rebalances are decoupled from task start and stop. These 
> operations happen in separate executor outside of the main worker thread that 
> performs the rebalance.
> Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by 
> the worker that is sending them. This worker is using the 
> {{StatusBackingStore#putSafe}} method that will reject any stale status 
> messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker 
> is aware of the newer status record that declares a task as {{RUNNING}}.
> In cases of fast consecutive rebalances where a task is revoked from one 
> worker and assigned to another one, it has been observed that there is a 
> small time window and thus a race condition during which a {{RUNNING}} status 
> record in the new generation is produced and is immediately followed by a 
> delayed {{UNASSIGNED}} status record belonging to the same or a previous 
> generation before the worker that sends this message reads the {{RUNNING}} 
> status record that corresponds to the latest generation.
> A couple of options are available to remediate this race condition. 
> For example a worker that is has started a task can re-write the {{RUNNING}} 
> status message in the topic if it reads a stale {{UNASSIGNED}} message from a 
> previous generation (that should have been fenced). 
> Another option is to ignore stale {{UNASSIGNED}} message (messages from an 
> earlier generation than the one in which the task had {{RUNNING}} status).
> Worth noting that when this race condition takes place, besides the 
> inaccurate status representation, the actual execution of the tasks remains 
> unaffected (e.g. the tasks are running correctly even though they appear as 
> {{UNASSIGNED}}). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15272) Fix the logic which finds candidate log segments to upload it to tiered storage

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15272:
---
Fix Version/s: 3.6.0

> Fix the logic which finds candidate log segments to upload it to tiered 
> storage
> ---
>
> Key: KAFKA-15272
> URL: https://issues.apache.org/jira/browse/KAFKA-15272
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> In tiered storage, a segment is eligible for deletion from local disk when it 
> gets uploaded to the remote storage. 
> If the topic active segment contains some messages and there are no new 
> incoming messages, then the active segment gets rotated to passive segment 
> after the configured {{log.roll.ms}} timeout.
>  
> The 
> [logic|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L553]
>  to find the candidate segment in RemoteLogManager does not include the 
> recently rotated passive segment as eligible to upload it to remote storage 
> so the passive segment won't be removed even after if it breaches by 
> retention time/size. (ie) Topic won't be empty after it becomes stale.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15290) Add support to onboard existing topics to tiered storage

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15290:
---
Fix Version/s: 3.6.0

> Add support to onboard existing topics to tiered storage
> 
>
> Key: KAFKA-15290
> URL: https://issues.apache.org/jira/browse/KAFKA-15290
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> This task is about adding support to enable tiered storage for existing 
> topics in the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14888:
---
Fix Version/s: 3.6.0

> RemoteLogManager - deleting expired/size breached log segments to remote 
> storage implementation 
> 
>
> Key: KAFKA-14888
> URL: https://issues.apache.org/jira/browse/KAFKA-14888
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.6.0
>
>
> Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
> covers deleting time/size breached log segments in remote storage.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15167) Tiered Storage Test Harness Framework

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15167:
---
Fix Version/s: 3.6.0

> Tiered Storage Test Harness Framework
> -
>
> Key: KAFKA-15167
> URL: https://issues.apache.org/jira/browse/KAFKA-15167
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0
>
>
> Base class for integration tests exercising the tiered storage functionality 
> in Kafka.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14821) Better handle timeouts in ListOffsets API

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-14821:
---
Fix Version/s: 3.5.0

> Better handle timeouts in ListOffsets API
> -
>
> Key: KAFKA-14821
> URL: https://issues.apache.org/jira/browse/KAFKA-14821
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Dimitar Dimitrov
>Assignee: Dimitar Dimitrov
>Priority: Major
> Fix For: 3.5.0
>
>
> The ListOffsets Admin API doesn't retry failed requests for partitions due to 
> timeouts or due to other types of retriable exceptions. This is a step back 
> compared to the Consumer offset APIs implemented in the fetcher as the latter 
> can do partial retries in such cases.
>  * The comparison is notable as some Kafka tools (e.g. 
> {{{}kafka-get-offsets{}}}) have moved from using the Consumer offset APIs to 
> using the ListOffsets Admin API.
> One nice way to address that seems to be to migrate the ListOffsets API to 
> use the more modern AdminApiDriver mechanism. That should automatically 
> provide the capability to retry requests which responses are deemed retriable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15181:
---
Fix Version/s: 3.6.0

> Race condition on partition assigned to TopicBasedRemoteLogMetadataManager 
> ---
>
> Key: KAFKA-15181
> URL: https://issues.apache.org/jira/browse/KAFKA-15181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Abhijeet Kumar
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.6.0
>
>
> TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared 
> whever partitions are assigned.
> When partitions are assigned to the TBRLMM instance, a consumer is started to 
> keep the cache up to date.
> If the cache hasn't finalized to build, TBRLMM fails to return remote 
> metadata about partitions that are store on the backing topic. TBRLMM may not 
> recover from this failing state.
> A proposal to fix this issue would be wait after a partition is assigned for 
> the consumer to catch up. A similar logic is used at the moment when TBRLMM 
> writes to the topic, and uses send callback to wait for consumer to catch up. 
> This logic can be reused whever a partition is assigned, so when TBRLMM is 
> marked as initialized, cache is ready to serve requests.
> Reference: https://github.com/aiven/kafka/issues/33



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9564) Integration Test framework for Tiered Storage

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-9564:
--
Fix Version/s: 3.6.0

> Integration Test framework for Tiered Storage
> -
>
> Key: KAFKA-9564
> URL: https://issues.apache.org/jira/browse/KAFKA-9564
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15289) Support KRaft mode in RequestQuotaTest

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15289:
---
Fix Version/s: 3.6.0

> Support KRaft mode in RequestQuotaTest
> --
>
> Key: KAFKA-15289
> URL: https://issues.apache.org/jira/browse/KAFKA-15289
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
> Fix For: 3.6.0
>
>
> we are calling `zkBrokerApis` in RequestQuotaTest, we should ensure kraft 
> broker apis are also supported, so use clientApis as far as possible.use 
> zkBrokerApis.clientApis instead of ApiKeys.zkBrokerApis.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15288) Change BrokerApiVersionsCommandTest to support kraft mode

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15288:
---
Fix Version/s: 3.6.0

> Change BrokerApiVersionsCommandTest to support kraft mode
> -
>
> Key: KAFKA-15288
> URL: https://issues.apache.org/jira/browse/KAFKA-15288
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Minor
> Fix For: 3.6.0
>
>
> Currently we only test zk mode for BrokerApiVersionsCommand



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15287) Change NodeApiVersions.create() to contains both apis of zk and kraft broker

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15287:
---
Fix Version/s: 3.6.0

> Change NodeApiVersions.create() to contains both apis of zk and kraft broker 
> -
>
> Key: KAFKA-15287
> URL: https://issues.apache.org/jira/browse/KAFKA-15287
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Deng Ziming
>Priority: Major
>  Labels: newbee
> Fix For: 3.6.0
>
>
> We are using ApiKeys.zkBrokerApis() when calling NodeApiVersions.create(), 
> this means we only support zk broker apis.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15439) Add transaction tests enabled with tiered storage

2023-09-19 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15439:
---
Fix Version/s: 3.6.0
   3.7.0

> Add transaction tests enabled with tiered storage
> -
>
> Key: KAFKA-15439
> URL: https://issues.apache.org/jira/browse/KAFKA-15439
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.6.0, 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on PR #14364:
URL: https://github.com/apache/kafka/pull/14364#issuecomment-1726880993

   @dajac @lianetm - Made some changes based on the comments, but obviously 
broke some existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330940223


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {
+return new NetworkClientDelegate.PollResult(
+Long.MAX_VALUE, Collections.emptyList());
+}
+
+if (!heartbeatRequestState.canSendRequest(currentTimeMs)) {
+return new NetworkClientDelegate.PollResult(
+ 

[GitHub] [kafka] github-actions[bot] commented on pull request #13881: MINOR: fix typo of ProducerConfig and KafkaProducer

2023-09-19 Thread via GitHub


github-actions[bot] commented on PR #13881:
URL: https://github.com/apache/kafka/pull/13881#issuecomment-1726830843

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14370: KAFKA-15449: Verify transactional offset commits (KIP-890 part 1)

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14370:
URL: https://github.com/apache/kafka/pull/14370#discussion_r1330915798


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1048,7 +1048,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+}
 
+if (origin == AppendOrigin.CLIENT || origin == 
AppendOrigin.COORDINATOR) {
   // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.

Review Comment:
   Good point. I will also update when David merges his change since it subtly 
affects this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #14370: KAFKA-15449: Verify transactional offset commits (KIP-890 part 1)

2023-09-19 Thread via GitHub


artemlivshits commented on code in PR #14370:
URL: https://github.com/apache/kafka/pull/14370#discussion_r1330859068


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1048,7 +1048,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+}
 
+if (origin == AppendOrigin.CLIENT || origin == 
AppendOrigin.COORDINATOR) {
   // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.

Review Comment:
   Comment should probably be updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-09-19 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726690138

   @gkousouris Thanks for sharing your use-case. I think you are right to look 
towards MM2 for this sort of translation, and I think it's unfortunate that it 
isn't straightforward. The current offset translation doesn't "converge" for 
consumer groups which are inactive due to memory limitations, but for a 
single-shot migration use-case, that's not good enough.
   
   Are you able to stop the producers to the upstream topic, and let the 
consumers commit offsets at the end of the topic before performing the 
migration? If you set offset.lag.max very low, MM2 should be able to translate 
offsets at the end of the topic.
   
   > Otherwise, the only solution I can think of is the hacky approach of 
reading the offsets and trying to decipher what message to read on the 
application-side, which seems brittle.
   
   Yeah, if you want to get a 100% precise translation away from the end of the 
topic and don't want to modify MM2, you're going to need to "synchronize" the 
two topics and figure out which messages line up. Between offset.lag.max, the 
syncs topic throttle semaphore, and the OffsetSyncStore, a lot of intermediate 
offset syncs get discarded and the precision of the translation decreases 
significantly. If you let MirrorMaker2 perform a rough translation that you 
later refine with a custom script, you probably only need to compare a few 
hundred record checksums for each topic-partition-consumer-group. This would 
also allow you to compensate for the skipped offsets that EOS mode produces.
   
   I think you could make such a script reliable enough for a one-off 
migration, with some manual spot-checking to make sure it doesn't do anything 
too incorrect.
   
   If you're willing to hack on the MirrorMaker connectors, you could disable 
the throttling semaphore, the offset.lag.max parameter, and implement the 
full-depth OffsetSyncStore to get perfect translation. I don't think we could 
add those to mainline MM2 without a configuration, but you are certainly 
welcome to temporarily fork MM2 to get the job done.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-09-19 Thread via GitHub


gkousouris commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726667976

   Thanks a lot for your reply! I will look into creating a Jira account and 
creating a ticket for this. 
   
   I should have mentioned that for we were planning on using Mirror Maker to 
migrate the topic a service reads from from cluster A to cluster B. So we would 
not be limited by the asynchronous offset translation that MirrorMaker uses, 
since we would be:
   
   1. Mirroring data from old topic in cluster A to new topic in cluster B. 
   2. Stopping the service and waiting for the consumer offset of the last 
message on cluster A to be replicated on the new topic on cluster.
   3. Restart the service to read from the new topic. 
   
   We would have hoped that the offset would be translated exactly at some 
point, and would let us to seamlessly start consuming from the same point it 
was last stopped. 
   
   MirrorMaker seems like a great use case for us, but this might be a bit of a 
blocker. Using the old offset translation version before this PR could perhaps 
work if we were to disable EOS (to get rid of the transactional messages). 
   
   Otherwise, the only solution I can think of is the hacky approach of reading 
the offsets and trying to decipher what message to read on the 
application-side, which seems brittle.
   
   Would you perhaps recommend a different approach to not re-process a message 
twice ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-09-19 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726637742

   Hi @gkousouris Thanks for asking!
   
   > Thus, when we restart the service (and make it consume from the new 
topic), it will re-process all messages from 
last_checkpoint_before_upstream_offset + 1 until downstream_offset. Isn't this 
a problem considering that Mirror Maker is providing Exactly Once Semantics for 
committing messages ?
   
   Your understanding of the offset translation (post-KAFKA-12468) is correct, 
and I would expect re-processing of messages downstream after a fail-over. I 
also understand that this doesn't satisfy "exactly once semantics" for some 
definition, because it allows for re-delivery of the same message to the same 
"application", when that application uses multiple Kafka clusters.
   
   MirrorMaker2 currently provides "exactly once semantics" for replicating 
data, but not for offsets. I believe this is captured by the 
"MirrorSourceConnector" declaring that it supports EOS, but 
the"MirrorCheckpointConnector" does not. This means that when you replicate a 
topic with EOS mode, and use read_committed on the downstream topic from the 
beginning, EOS would mean that you read each record present in the upstream 
topic exactly once. When you instead start reading at the committed downstream 
offset, you may have records delivered downstream that have already been 
committed upstream.
   
   This is not just caused by the offset translation that this PR implements, 
it's a limitation of the asynchronous offset translation that MirrorMaker2 
uses. Consider this sequence:
   1. MirrorCheckpointTask syncs offsets 
   2. MirrorCheckpointTask sleeps
   3. MirrorSourceTask replicates some records
   4. The upstream consumer group consumes upstream records and commits offsets 
to the upstream group
   5. The downstream consumer group starts reading the topic with the stale 
offsets in the downstream group
   
   Thanks for doing your due diligence on the claims of "exactly once 
semantics", and I hope that you can still make MirrorMaker2 work for your 
use-case. I suspect that EOS semantics across multiple Kafka clusters is a much 
larger effort than just changing the offset translation logic :) If you have a 
Jira account, please consider opening a ticket about this shortcoming.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-09-19 Thread via GitHub


gkousouris commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726599437

   Hi @gharris1727, if I understand this PR correctly, this will (almost 
always) cause duplication of data. Our problem is this:
   1. We have a service reading from a Kafka topic and committing offsets for 
its consumer group.
   2. We use MirrorMaker to replicate the topic to a different cluster.
   3. We pause the service and check the current offset for the 2 streams (the 
one in the old cluster and the one in the new cluster).
   
   In step 3, these offsets will be different, specifically, the offset from 
the old cluster will be the last message the service managed to commit an 
offset for. And the new topic will have as an offset the value: 
`last_checkpoint_before_upstream_offset + 1`. 
   
   Thus, when we restart the service (and make it consume from the new topic), 
it will re-process all messages from `last_checkpoint_before_upstream_offset + 
1` until `downstream_offset`. Isn't this a problem considering that Mirror 
Maker is providing Exactly Once Semantics for committing messages ?
   
   This behaviour was verified by looking at the output of the 
`kafka-consumer-groups` script. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #14409: Fix migration system tests for 3.6

2023-09-19 Thread via GitHub


mumrah opened a new pull request, #14409:
URL: https://github.com/apache/kafka/pull/14409

   As part of validating 3.6.0 RC0, I ran the ZK migration system tests at the 
RC tag. Pretty much all of them failed due to recent changes (particularly, 
disallowing migrations with JBOD). All of the changes here are test fixes, so 
not a release blocker.
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.3
   session_id:   2023-09-19--007
   run time: 8 minutes 51.147 seconds
   tests run:5
   passed:   5
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=False
   status: PASS
   run time:   2 minutes 56.029 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_online_migration.roll_controller=True
   status: PASS
   run time:   3 minutes 1.037 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_pre_migration_mode_3_4.metadata_quorum=ISOLATED_KRAFT
   status: PASS
   run time:   44.101 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_reconcile_kraft_to_zk
   status: PASS
   run time:   1 minute 25.509 seconds
   

   test_id:
kafkatest.tests.core.zookeeper_migration_test.TestMigration.test_upgrade_after_3_4_migration
   status: PASS
   run time:   43.993 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-19 Thread via GitHub


lianetm commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1330551023


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -395,27 +386,28 @@ class PendingRequests {
 List unsentOffsetFetches = new ArrayList<>();
 List inflightOffsetFetches = new 
ArrayList<>();
 
-public boolean hasUnsentRequests() {
+// Visible for teseting
+boolean hasUnsentRequests() {
 return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
 }
 
-public CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
+CompletableFuture addOffsetCommitRequest(final 
Map offsets) {
 // TODO: Dedupe committing the same offsets to the same partitions
 OffsetCommitRequestState request = new OffsetCommitRequestState(
 offsets,
 groupState.groupId,
 groupState.groupInstanceId.orElse(null),
 groupState.generation);
 unsentOffsetCommits.add(request);
-return request.future();
+return request.future;
 }
 
 /**
- *  Adding an offset fetch request to the outgoing buffer.  If the 
same request was made, we chain the future
- *  to the existing one.
+ * Adding an offset fetch request to the outgoing buffer.  If the 
same request was made, we chain the future
+ * to the existing one.
  *
- *  If the request is new, it invokes a callback to remove itself 
from the {@code inflightOffsetFetches}
- *  upon completion.
+ * If the request is new, it invokes a callback to remove itself 
from the {@code inflightOffsetFetches}
+ * upon completion.

Review Comment:
   nit: wrong tag  and other unclosed ones above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach

2023-09-19 Thread via GitHub


kamalcph commented on PR #14407:
URL: https://github.com/apache/kafka/pull/14407#issuecomment-1726278190

   > I had a comment to fix this/similar problem in original PR - [#13561 
(comment)](https://github.com/apache/kafka/pull/13561#discussion_r1293305304)
   > 
   > I am curious, why didn't the test which was added to resolve the comment 
fail? Is it because the test only checked for eligibility of a segment for 
calculation of size and didn't actually check if same segment is being counted 
twice?
   
   We don't have a test for the scenario mentioned in the 
[comment](https://github.com/apache/kafka/pull/13561#discussion_r1293305304). 
And, there was a regression after #14349. I have added couple of tests to 
verify the deleted segment count and log-start-offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on PR #14402:
URL: https://github.com/apache/kafka/pull/14402#issuecomment-1726276699

   There also seems to be a problem with the build. All the versions seem to be 
hanging. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330542903


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -69,7 +69,6 @@ import com.yammer.metrics.core.Gauge
 import kafka.log.remote.RemoteLogManager
 import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.common.internals.Topic
-import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction}

Review Comment:
   For our `setUpReplicaManagerWithMockedAddPartitionsToTxnManager` we can 
remove the lines of code that mocks the flow for getting transaction state 
partitions.
   
   val metadataResponseTopic = Seq(new MetadataResponseTopic()
 .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
 .setPartitions(Seq(
   new MetadataResponsePartition()
 .setPartitionIndex(0)
 .setLeaderId(0)).asJava))
   transactionalTopicPartitions.foreach(tp => 
when(metadataCache.contains(tp)).thenReturn(true))
   
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName)).thenReturn(metadataResponseTopic)
   when(metadataCache.getAliveBrokerNode(0, 
config.interBrokerListenerName)).thenReturn(Some(node))
   when(metadataCache.getAliveBrokerNode(1, 
config.interBrokerListenerName)).thenReturn(None)
   
I think the same code exists for `setupReplicaManagerWithMockedPurgatories` 
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach

2023-09-19 Thread via GitHub


divijvaidya commented on PR #14407:
URL: https://github.com/apache/kafka/pull/14407#issuecomment-1726259952

   I had a comment to fix this/similar problem in original PR - 
https://github.com/apache/kafka/pull/13561#discussion_r1293305304 
   
   I am curious, why didn't the test which was added to resolve the comment 
fail? Is it because the test only checked for eligibility of a segment for 
calculation of size and didn't actually check if same segment is being counted 
twice?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330524344


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -157,29 +216,114 @@ class AddPartitionsToTxnManagerTest {
 // The request for node1 should not be added because one request is 
already inflight.
 assertEquals(1, requestsAndHandlers2.size)
 requestsAndHandlers2.foreach { requestAndHandler =>
-  verifyRequest(node2, transactionalId3, producerId3, requestAndHandler)
+  verifyRequest(node2, transactionalId3, producerId3, requestAndHandler, 
verifyOnly = false)
 }
 
 // Complete the request for node1 so the new one can go through.
 requestsAndHandlers.filter(_.destination == 
node1).head.handler.onComplete(authenticationErrorResponse)
 val requestsAndHandlers3 = 
addPartitionsToTxnManager.generateRequests().asScala
 assertEquals(1, requestsAndHandlers3.size)
 requestsAndHandlers3.foreach { requestAndHandler =>
-  verifyRequest(node1, transactionalId2, producerId2, requestAndHandler)
+  verifyRequest(node1, transactionalId2, producerId2, requestAndHandler, 
verifyOnly = true)
+}
+  }
+
+  @Test
+  def testTransactionCoordinatorResolution(): Unit = {
+when(partitionFor.apply(transactionalId1)).thenReturn(0)
+
+def checkError(): Unit = {
+  val errors = mutable.Map[TopicPartition, Errors]()
+
+  addPartitionsToTxnManager.addTxnData(
+transactionalId1,
+producerId1,
+producerEpoch = 0,
+verifyOnly = true,
+topicPartitions,
+setErrors(errors)
+  )
+
+  assertEquals(topicPartitions.map(tp => tp -> 
Errors.COORDINATOR_NOT_AVAILABLE).toMap, errors)
 }
+
+// The transaction state topic does not exist.
+
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+  .thenReturn(Seq())
+checkError()
+
+// The metadata of the transaction state topic returns an error.
+
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+  .thenReturn(Seq(
+new MetadataResponseData.MetadataResponseTopic()
+  .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+  .setErrorCode(Errors.BROKER_NOT_AVAILABLE.code)
+  ))
+checkError()
+
+// The partition does not exist.
+
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+  .thenReturn(Seq(
+new MetadataResponseData.MetadataResponseTopic()
+  .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+  ))
+checkError()
+
+// The partition has not leader.

Review Comment:
   nit: The partition has no leader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330523078


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest {
   private val versionMismatchResponse = clientResponse(null, mismatchException 
= new UnsupportedVersionException(""))
   private val disconnectedResponse = clientResponse(null, disconnected = true)
 
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
+
   @BeforeEach
   def setup(): Unit = {
 addPartitionsToTxnManager = new AddPartitionsToTxnManager(
-  KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
+  config,
   networkClient,
-  time)
+  metadataCache,
+  partitionFor,
+  time
+)
   }
 
   @AfterEach
   def teardown(): Unit = {
 addPartitionsToTxnManager.shutdown()
   }
 
-  def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: 
Map[TopicPartition, Errors]): Unit = {
-callbackErrors.foreach {
-  case (tp, error) => errors.put(tp, error)
-}
+  private def setErrors(errors: mutable.Map[TopicPartition, 
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
+callbackErrors.forKeyValue(errors.put)
   }
 
   @Test
   def testAddTxnData(): Unit = {
+when(partitionFor.apply(transactionalId1)).thenReturn(0)
+when(partitionFor.apply(transactionalId2)).thenReturn(1)
+when(partitionFor.apply(transactionalId3)).thenReturn(0)
+
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+  .thenReturn(Seq(
+new MetadataResponseData.MetadataResponseTopic()
+  .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+  .setPartitions(List(
+new MetadataResponseData.MetadataResponsePartition()
+  .setPartitionIndex(0)
+  .setLeaderId(0),
+new MetadataResponseData.MetadataResponsePartition()
+  .setPartitionIndex(1)
+  .setLeaderId(1)
+  ).asJava)
+  ))
+when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName))
+  .thenReturn(Some(node0))
+when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName))
+  .thenReturn(Some(node1))
+
 val transaction1Errors = mutable.Map[TopicPartition, Errors]()
 val transaction2Errors = mutable.Map[TopicPartition, Errors]()
 val transaction3Errors = mutable.Map[TopicPartition, Errors]()
 
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId1, producerId1), setErrors(transaction1Errors))
-addPartitionsToTxnManager.addTxnData(node1, 
transactionData(transactionalId2, producerId2), setErrors(transaction2Errors))
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId3, producerId3), setErrors(transaction3Errors))
+addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, 
producerEpoch = 0, verifyOnly = true, topicPartitions, 
setErrors(transaction1Errors))
+addPartitionsToTxnManager.addTxnData(transactionalId2, producerId2, 
producerEpoch = 0, verifyOnly = true, topicPartitions, 
setErrors(transaction2Errors))
+addPartitionsToTxnManager.addTxnData(transactionalId3, producerId3, 
producerEpoch = 0, verifyOnly = true, topicPartitions, 
setErrors(transaction3Errors))
 
 // We will try to add transaction1 3 more times (retries). One will have 
the same epoch, one will have a newer epoch, and one will have an older epoch 
than the new one we just added.
 val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, 
Errors]()
 val transaction1RetryWithNewerEpochErrors = mutable.Map[TopicPartition, 
Errors]()
 val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, 
Errors]()
 
 // Trying to add more transactional data for the same transactional ID, 
producer ID, and epoch should simply replace the old data and send a retriable 
response.
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId1, producerId1), 
setErrors(transaction1RetryWithSameEpochErrors))
+addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, 
producerEpoch = 0, verifyOnly = true, topicPartitions, 
setErrors(transaction1RetryWithSameEpochErrors))
 val expectedNetworkErrors = topicPartitions.map(_ -> 
Errors.NETWORK_EXCEPTION).toMap
 assertEquals(expectedNetworkErrors, transaction1Errors)
 
 // Trying to add more transactional data for the same transactional ID and 
producer ID, but new epoch should replace the old data and send an error 
response for it.
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId1, producerId1, producerEpoch = 1), 
setErrors(transaction1RetryWithNewerEpochErrors))
+addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, 
producerEpoch = 

[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330522384


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -63,7 +73,42 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: 
NetworkClient, time
   val verificationFailureRate = 
metricsGroup.newMeter(VerificationFailureRateMetricName, "failures", 
TimeUnit.SECONDS)
   val verificationTimeMs = 
metricsGroup.newHistogram(VerificationTimeMsMetricName)
 
-  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+  def addTxnData(
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short,
+verifyOnly: Boolean,

Review Comment:
   We will add this functionality later, but AddPartitionsToTxnManager 
currently only supports verifyOnly. In part 2, we will send 
AddPartitionsToTxnRequests that actually add the partition.
   
   I suppose this is ok since replicaManager hard codes true for now.



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -63,7 +73,42 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: 
NetworkClient, time
   val verificationFailureRate = 
metricsGroup.newMeter(VerificationFailureRateMetricName, "failures", 
TimeUnit.SECONDS)
   val verificationTimeMs = 
metricsGroup.newHistogram(VerificationTimeMsMetricName)
 
-  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+  def addTxnData(
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short,
+verifyOnly: Boolean,

Review Comment:
   We will add this functionality later, but AddPartitionsToTxnManager 
currently only supports verifyOnly=true. In part 2, we will send 
AddPartitionsToTxnRequests that actually add the partition.
   
   I suppose this is ok since replicaManager hard codes true for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330519350


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest {
   private val versionMismatchResponse = clientResponse(null, mismatchException 
= new UnsupportedVersionException(""))
   private val disconnectedResponse = clientResponse(null, disconnected = true)
 
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
+
   @BeforeEach
   def setup(): Unit = {
 addPartitionsToTxnManager = new AddPartitionsToTxnManager(
-  KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
+  config,
   networkClient,
-  time)
+  metadataCache,
+  partitionFor,
+  time
+)
   }
 
   @AfterEach
   def teardown(): Unit = {
 addPartitionsToTxnManager.shutdown()
   }
 
-  def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: 
Map[TopicPartition, Errors]): Unit = {
-callbackErrors.foreach {
-  case (tp, error) => errors.put(tp, error)
-}
+  private def setErrors(errors: mutable.Map[TopicPartition, 
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
+callbackErrors.forKeyValue(errors.put)
   }
 
   @Test
   def testAddTxnData(): Unit = {
+when(partitionFor.apply(transactionalId1)).thenReturn(0)

Review Comment:
   I wonder if there is a way to make a helper where you pass in id + 
partition. But that might not be worth it for 2 tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14402: MINOR: Push logic to resolve the transaction coordinator into the AddPartitionsToTxnManager

2023-09-19 Thread via GitHub


jolshan commented on code in PR #14402:
URL: https://github.com/apache/kafka/pull/14402#discussion_r1330516886


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -68,87 +71,143 @@ class AddPartitionsToTxnManagerTest {
   private val versionMismatchResponse = clientResponse(null, mismatchException 
= new UnsupportedVersionException(""))
   private val disconnectedResponse = clientResponse(null, disconnected = true)
 
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
+
   @BeforeEach
   def setup(): Unit = {
 addPartitionsToTxnManager = new AddPartitionsToTxnManager(
-  KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")),
+  config,
   networkClient,
-  time)
+  metadataCache,
+  partitionFor,
+  time
+)
   }
 
   @AfterEach
   def teardown(): Unit = {
 addPartitionsToTxnManager.shutdown()
   }
 
-  def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: 
Map[TopicPartition, Errors]): Unit = {
-callbackErrors.foreach {
-  case (tp, error) => errors.put(tp, error)
-}
+  private def setErrors(errors: mutable.Map[TopicPartition, 
Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = {
+callbackErrors.forKeyValue(errors.put)
   }
 
   @Test
   def testAddTxnData(): Unit = {
+when(partitionFor.apply(transactionalId1)).thenReturn(0)
+when(partitionFor.apply(transactionalId2)).thenReturn(1)
+when(partitionFor.apply(transactionalId3)).thenReturn(0)
+
when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), 
config.interBrokerListenerName))
+  .thenReturn(Seq(
+new MetadataResponseData.MetadataResponseTopic()
+  .setName(Topic.TRANSACTION_STATE_TOPIC_NAME)
+  .setPartitions(List(
+new MetadataResponseData.MetadataResponsePartition()
+  .setPartitionIndex(0)
+  .setLeaderId(0),
+new MetadataResponseData.MetadataResponsePartition()
+  .setPartitionIndex(1)
+  .setLeaderId(1)
+  ).asJava)
+  ))
+when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName))
+  .thenReturn(Some(node0))
+when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName))
+  .thenReturn(Some(node1))
+
 val transaction1Errors = mutable.Map[TopicPartition, Errors]()
 val transaction2Errors = mutable.Map[TopicPartition, Errors]()
 val transaction3Errors = mutable.Map[TopicPartition, Errors]()
 
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId1, producerId1), setErrors(transaction1Errors))
-addPartitionsToTxnManager.addTxnData(node1, 
transactionData(transactionalId2, producerId2), setErrors(transaction2Errors))
-addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId3, producerId3), setErrors(transaction3Errors))
+addPartitionsToTxnManager.addTxnData(transactionalId1, producerId1, 
producerEpoch = 0, verifyOnly = true, topicPartitions, 
setErrors(transaction1Errors))

Review Comment:
   Did we specify the param name here for readability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14407: KAFKA-15479: Remote log segments should be considered once for retention breach

2023-09-19 Thread via GitHub


kamalcph commented on code in PR #14407:
URL: https://github.com/apache/kafka/pull/14407#discussion_r1330492380


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -993,7 +988,9 @@ private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, Ex
 return;
 }
 RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
-
+if (segmentsToDelete.contains(metadata)) {

Review Comment:
   This is the main fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown

2023-09-19 Thread via GitHub


nizhikov commented on PR #13929:
URL: https://github.com/apache/kafka/pull/13929#issuecomment-1726203159

   Hello. I will review this PR in the nearest time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dongnuo123 opened a new pull request, #14408: Kafka 14506: Implement DeleteGroups API and OffsetDelete API

2023-09-19 Thread via GitHub


dongnuo123 opened a new pull request, #14408:
URL: https://github.com/apache/kafka/pull/14408

   This pr implements DeleteGroups and OffsetDelete API according to the 
implementation in the old GroupCoordinator but with the new model.
   
   ### Jira
   https://issues.apache.org/jira/browse/KAFKA-14506
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14407: KAFKA-15479: Remote log segments should be considered once for retention breach

2023-09-19 Thread via GitHub


kamalcph opened a new pull request, #14407:
URL: https://github.com/apache/kafka/pull/14407

   When a remote log segment contains multiple epoch, then it gets considered 
for multiple times during breach by retention size/time/start-offset. This will 
affect the deletion by remote log retention size as it deletes the number of 
segments lesser than expected. This is a follow-up of KAFKA-15352
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention breach

2023-09-19 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15479:
-
Summary: Remote log segments should be considered once for retention breach 
 (was: Remote log segments should be considered once for retention size breach)

> Remote log segments should be considered once for retention breach
> --
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> When a remote log segment contains multiple epoch, then it gets considered 
> for multiple times during breach by retention size/time/start-offset. This 
> will affect the deletion by remote log retention size as it deletes the 
> number of segments lesser than expected. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330480862


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatInterval = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+

[GitHub] [kafka] kirktrue closed pull request #14397: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-19 Thread via GitHub


kirktrue closed pull request #14397: KAFKA-14274 [6, 7]: Introduction of fetch 
request manager
URL: https://github.com/apache/kafka/pull/14397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-19 Thread via GitHub


kirktrue opened a new pull request, #14406:
URL: https://github.com/apache/kafka/pull/14406

   Changes:
   
   1. Introduces `FetchRequestManager` that implements the `RequestManager` API 
for fetching messages from brokers. Unlike `Fetcher`, record decompression and 
deserialization is performed on the application thread inside `CompletedFetch`.
   2. Restructured the code so that objects owned by the background thread are 
not instantiated until the background thread runs (via `Supplier`) to ensure 
that there are no references available to the application thread.
   3. Ensuring resources are properly using `Closeable` and using 
`IdempotentCloser` to ensure they're only closed once.
   4. Introduces `ConsumerTestBuilder` to reduce a lot of inconsistency in the 
way the objects were built up for tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330475695


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, heartbeatIntervalMs, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.time = time;
+this.logger = logContext.logger(this.getClass());
+this.subscriptions = subscriptions;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.heartbeatRequestState = heartbeatRequestState;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+}
+
+@Override
+public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
+if (!coordinatorRequestManager.coordinator().isPresent() || 
membershipManager.notInGroup()) {

Review Comment:
   As previously commented, maybe let's use `shouldHeartbeat` to be more 
explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] ahuang98 opened a new pull request, #14405: [MINOR] QuorumController tests use testToImage

2023-09-19 Thread via GitHub


ahuang98 opened a new pull request, #14405:
URL: https://github.com/apache/kafka/pull/14405

   Tests that records generated from existing QuorumController tests will 
generate the same final image regardless of how they are batched in replay.
   
   Builds on top of https://github.com/apache/kafka/pull/13724
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2023-09-19 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766887#comment-17766887
 ] 

Chris Egerton commented on KAFKA-10339:
---

There is no MirrorSinkConnector; we never implemented that.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention size breach

2023-09-19 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15479:
-
Description: When a remote log segment contains multiple epoch, then it 
gets considered for multiple times during breach by retention 
size/time/start-offset. This will affect the deletion by remote log retention 
size as it deletes the number of segments lesser than expected. This is a 
follow-up of KAFKA-15352  (was: When a remote log segment contains multiple 
epoch, then it considered for multiple times during deletion. This will affect 
the deletion by remote log retention size. This is a follow-up of KAFKA-15352)

> Remote log segments should be considered once for retention size breach
> ---
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> When a remote log segment contains multiple epoch, then it gets considered 
> for multiple times during breach by retention size/time/start-offset. This 
> will affect the deletion by remote log retention size as it deletes the 
> number of segments lesser than expected. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #14386: KAFKA-14960: TopicMetadata request manager

2023-09-19 Thread via GitHub


junrao commented on code in PR #14386:
URL: https://github.com/apache/kafka/pull/14386#discussion_r1330455761


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 
+ * Manages the state of topic metadata requests. This manager returns a
+ * {@link NetworkClientDelegate.PollResult} when a request is ready to
+ * be sent. Specifically, this manager handles the following user API calls:
+ * 
+ * 
+ * listTopics
+ * partitionsFor
+ * 
+ * 
+ * The manager checks the state of the {@link TopicMetadataRequestState} 
before sending a new one to
+ * prevent sending it without backing off from previous attempts.
+ * It also checks the state of inflight requests to avoid overwhelming the 
broker with duplicate requests.
+ * The {@code inflightRequests} are memorized by topic name. If all topics are 
requested, then we use {@code Optional
+ * .empty()} as the key.
+ * Once a request is completed successfully, its corresponding entry is 
removed.
+ * 
+ */
+
+public class TopicMetadataRequestManager implements RequestManager {
+private final boolean allowAutoTopicCreation;
+private final Map, TopicMetadataRequestState> 
inflightRequests;
+private final long retryBackoffMs;
+private final long retryBackoffMaxMs;
+private final Logger log;
+private final LogContext logContext;
+
+public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+logContext = context;
+log = logContext.logger(this.getClass());

Review Comment:
   Could we remove `this`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -91,7 +90,7 @@ public CommitRequestManager(
 }
 
 /**
- * Poll for the {@link OffsetFetchRequest} and {@link OffsetCommitRequest} 
request if there's any. The function will
+ * Poll for the {@link OffsetFetchRequest} and {@link 
OffsetCommitRequestState} request if there's any. The function will

Review Comment:
   Hmm, OffsetCommitRequestState should be OffsetCommitRequest, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 

[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once during retention size breach

2023-09-19 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15479:
-
Issue Type: Task  (was: Improvement)

> Remote log segments should be considered once during retention size breach
> --
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> When a remote log segment contains multiple epoch, then it considered for 
> multiple times during deletion. This will affect the deletion by remote log 
> retention size. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15479) Remote log segments should be considered once for retention size breach

2023-09-19 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15479:
-
Summary: Remote log segments should be considered once for retention size 
breach  (was: Remote log segments should be considered once during retention 
size breach)

> Remote log segments should be considered once for retention size breach
> ---
>
> Key: KAFKA-15479
> URL: https://issues.apache.org/jira/browse/KAFKA-15479
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> When a remote log segment contains multiple epoch, then it considered for 
> multiple times during deletion. This will affect the deletion by remote log 
> retention size. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330460386


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.time = time;
+this.logger = logContext.logger(HeartbeatRequestManager.class);
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+
+long heartbeatIntervalMs = 
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);

Review Comment:
   thanks, just note it here: `group.consumer.heartbeat.interval.ms is defined 
on the server side and the member is told about it in the heartbeat response.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15479) Remote log segments should be considered once during retention size breach

2023-09-19 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15479:


 Summary: Remote log segments should be considered once during 
retention size breach
 Key: KAFKA-15479
 URL: https://issues.apache.org/jira/browse/KAFKA-15479
 Project: Kafka
  Issue Type: Improvement
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


When a remote log segment contains multiple epoch, then it considered for 
multiple times during deletion. This will affect the deletion by remote log 
retention size. This is a follow-up of KAFKA-15352



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330458175


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class HeartbeatRequestManager implements RequestManager {
+private final Time time;

Review Comment:
   Will do. Sorry for completely missing this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-19 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1330451339


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -79,6 +80,9 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return String.format("Assignor selection {type:%s, name:%s}", type, 
serverAssignor);
+return "AssignorSelection{" +

Review Comment:
   I see - I think we've been consistently using { in the refactor.  Maybe we 
should change that @kirktrue @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin

2023-09-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15478:
--
Description: 
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 

 

We already have ForwardingAdmin in MM2 so we can extend connect to do something 
similiar

 KIP-981 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin]

  was:
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 

 

We already have ForwardingAdmin in MM2 so we can extend connect to do something 
similiar


> Update connect to use ForwardingAdmin
> -
>
> Key: KAFKA-15478
> URL: https://issues.apache.org/jira/browse/KAFKA-15478
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Omnia Ibrahim
>Priority: Major
>  Labels: need-kip
>
> Connect uses AdminClients to create topics; while this simplifies the 
> implementation of Connect it has the following problems 
>  * It assumes that whoever runs Connect must have admin access to both source 
> and destination clusters. This assumption is not necessarily valid all the 
> time.
>  * It creates conflict in use-cases where centralised systems or tools manage 
> Kafka resources. 
> It would be easier if customers could provide how they want to manage Kafka 
> topics through admin client or using their centralised system or tools. 
>  
> We already have ForwardingAdmin in MM2 so we can extend connect to do 
> something similiar
>  KIP-981 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-981%3A+Manage+Connect+topics+with+custom+implementation+of+Admin]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin

2023-09-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15478:
--
Labels: need-kip  (was: )

> Update connect to use ForwardingAdmin
> -
>
> Key: KAFKA-15478
> URL: https://issues.apache.org/jira/browse/KAFKA-15478
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Omnia Ibrahim
>Priority: Major
>  Labels: need-kip
>
> Connect uses AdminClients to create topics; while this simplifies the 
> implementation of Connect it has the following problems 
>  * It assumes that whoever runs Connect must have admin access to both source 
> and destination clusters. This assumption is not necessarily valid all the 
> time.
>  * It creates conflict in use-cases where centralised systems or tools manage 
> Kafka resources. 
> It would be easier if customers could provide how they want to manage Kafka 
> topics through admin client or using their centralised system or tools. 
>  
> We already have ForwardingAdmin in MM2 so we can extend connect to do 
> something similiar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15313) Delete remote log segments partition asynchronously when a partition is deleted.

2023-09-19 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15313:
-
Description: 
KIP-405 already covers the approach to delete remote log segments 
asynchronously through controller and RLMM layers. 

reference: https://github.com/apache/kafka/pull/13947#discussion_r1281675818

  was:KIP-405 already covers the approach to delete remote log segments 
asynchronously through controller and RLMM layers. 


> Delete remote log segments partition asynchronously when a partition is 
> deleted. 
> -
>
> Key: KAFKA-15313
> URL: https://issues.apache.org/jira/browse/KAFKA-15313
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>  Labels: KIP-405
>
> KIP-405 already covers the approach to delete remote log segments 
> asynchronously through controller and RLMM layers. 
> reference: https://github.com/apache/kafka/pull/13947#discussion_r1281675818



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin

2023-09-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15478:
--
Description: 
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 

 

We already have ForwardingAdmin in MM2 so we can extend connect to do something 
similiar

  was:
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 

 

We already have


> Update connect to use ForwardingAdmin
> -
>
> Key: KAFKA-15478
> URL: https://issues.apache.org/jira/browse/KAFKA-15478
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Omnia Ibrahim
>Priority: Major
>
> Connect uses AdminClients to create topics; while this simplifies the 
> implementation of Connect it has the following problems 
>  * It assumes that whoever runs Connect must have admin access to both source 
> and destination clusters. This assumption is not necessarily valid all the 
> time.
>  * It creates conflict in use-cases where centralised systems or tools manage 
> Kafka resources. 
> It would be easier if customers could provide how they want to manage Kafka 
> topics through admin client or using their centralised system or tools. 
>  
> We already have ForwardingAdmin in MM2 so we can extend connect to do 
> something similiar



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15478) Update connect to use ForwardingAdmin

2023-09-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-15478:
--
Description: 
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 

 

We already have

  was:
Connect uses AdminClients to create topics; while this simplifies the 
implementation of Connect it has the following problems 
 * It assumes that whoever runs Connect must have admin access to both source 
and destination clusters. This assumption is not necessarily valid all the time.
 * It creates conflict in use-cases where centralised systems or tools manage 
Kafka resources. 

It would be easier if customers could provide how they want to manage Kafka 
topics through admin client or using their centralised system or tools. 


> Update connect to use ForwardingAdmin
> -
>
> Key: KAFKA-15478
> URL: https://issues.apache.org/jira/browse/KAFKA-15478
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Omnia Ibrahim
>Priority: Major
>
> Connect uses AdminClients to create topics; while this simplifies the 
> implementation of Connect it has the following problems 
>  * It assumes that whoever runs Connect must have admin access to both source 
> and destination clusters. This assumption is not necessarily valid all the 
> time.
>  * It creates conflict in use-cases where centralised systems or tools manage 
> Kafka resources. 
> It would be easier if customers could provide how they want to manage Kafka 
> topics through admin client or using their centralised system or tools. 
>  
> We already have



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >