[jira] [Resolved] (KAFKA-14495) Improve the RemoteIndexCacheTest

2023-02-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14495.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Improve the RemoteIndexCacheTest
> 
>
> Key: KAFKA-14495
> URL: https://issues.apache.org/jira/browse/KAFKA-14495
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.5.0
>
>
> https://github.com/apache/kafka/pull/11390/files#r1049392445
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14128) Kafka Streams terminates on topic check

2023-02-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14128.
-
Fix Version/s: 3.5.0
   3.4.1
   Resolution: Fixed

> Kafka Streams terminates on topic check
> ---
>
> Key: KAFKA-14128
> URL: https://issues.apache.org/jira/browse/KAFKA-14128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
> Environment: Any
>Reporter: Patrik Kleindl
>Assignee: Lucia Cerchie
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> Our streams application shut down unexpectedly after some network issues that 
> should have been easily recoverable.
> Logs:
>  
> {code:java}
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting 
> from node 3 due to request timeout.
> 2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
> org.apache.kafka.clients.NetworkClient   : [AdminClient 
> clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
> in-flight METADATA request with correlation id 985 due to node 3 being 
> disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
> 60023ms, request timeout: 3ms)
> 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
> o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
> error during topic description for 
> L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
> Error message was: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
> nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
> 2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread     : stream-thread 
> [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
> I think the relevant code is in 
> [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
> {code:java}
> topicFuture.getValue().get();{code}
> without a timeout value cannot throw a TimeoutException, so the 
> TimeoutException of the AdminClient will be an ExecutionException and hit the 
> last else branch where the StreamsException is thrown.
> Possible fix:
> Use the KafkaFuture method with timeout:
> {code:java}
> public abstract T get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException,
> TimeoutException;{code}
> instead of 
> {code:java}
> public abstract T get() throws InterruptedException, ExecutionException;{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13161: Kafka 14128

2023-02-22 Thread via GitHub


mjsax commented on PR #13161:
URL: https://github.com/apache/kafka/pull/13161#issuecomment-1441323418

   Thanks for the PR. Merged to `trunk` and cherry-picked to `3.4` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13279: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded

2023-02-22 Thread via GitHub


showuon commented on code in PR #13279:
URL: https://github.com/apache/kafka/pull/13279#discussion_r1115304004


##
core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala:
##
@@ -221,6 +222,8 @@ class FetchRequestDownConversionConfigTest extends 
BaseRequestTest {
 } else {
   assertEquals(Errors.UNSUPPORTED_VERSION, 
error(partitionWithDownConversionDisabled))
 }
+TestUtils.waitUntilTrue(() => 
TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec) > 
initialFetchMessageConversionsPerSec,
+s"init: $initialFetchMessageConversionsPerSec final: 
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}", 3000)

Review Comment:
   nit: The error message could be more clear, ex:
   ```
   The `FetchMessageConversionsPerSec` metric count is not incremented after 3 
seconds. init: $initialFetchMessageConversionsPerSec final: 
${TestUtils.metersCount(BrokerTopicStats.FetchMessageConversionsPerSec)}
   ```
   
   Also, the 3 seconds might be able to increase to 5 sec to avoid flaky 
failure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13161: Kafka 14128

2023-02-22 Thread via GitHub


mjsax merged PR #13161:
URL: https://github.com/apache/kafka/pull/13161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13290: MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch

2023-02-22 Thread via GitHub


chia7712 commented on PR #13290:
URL: https://github.com/apache/kafka/pull/13290#issuecomment-1441274910

   > But why does this PR have no CI test running?
   
   maybe jenkins is on vacation. merge trunk to trigger QA again 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-22 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1441268058

   @ijuma changes are integrated (please remember that 
https://github.com/apache/kafka/pull/13263 needs to be merged prior to this 
one).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-22 Thread via GitHub


dajac commented on PR #13231:
URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441262764

   > Took a quick look at the unstable api change. Looks like some integration 
tests built specifically for v4 fail with 
`org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled`
   > 
   > I will need to look into this.
   
   @jolshan I suppose that you have to enable unstable apis in your new 
integration tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-02-22 Thread via GitHub


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1441214746

   Thanks Michael. I added a happy path testcase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14739) Kafka consumer reading messages out of order after a rebalance

2023-02-22 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692452#comment-17692452
 ] 

Luke Chen commented on KAFKA-14739:
---

[~colinshaw] , Have you tried to run with the latest version of kafka client? 
We did some improvement to it before. Please check if this issue still existed. 
Thanks.

 

> Kafka consumer reading messages out of order after a rebalance
> --
>
> Key: KAFKA-14739
> URL: https://issues.apache.org/jira/browse/KAFKA-14739
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Colin Shaw
>Priority: Major
> Attachments: image-2023-02-21-16-45-10-464.png
>
>
> We are seeing often when our consumers are running and a rebalance is 
> triggered, some partitions do not resume at the correct offset causing a 
> large number of events to be skipped.
> Our understanding is that within a partition events should also be read in 
> chronological order.
>  
> These consumers run once a day and we see that when they resume again the 
> next day it does return to the missed events.  
>  
> An example we investigated: 
> {quote}We see for partition 88 starting processing around Offset 14001. 
> Events were processed until Offset 14059 but we could see Offset 14060 was 
> not processed from out logs. 
> Upon investigation we could see a rebalance had occured after offset 14059 
> and afterwards the next event processed was offset 14190 causing about 130 
> events to be skipped on this partition.
> The subsequent day we it did resume at Offset 14060
> {quote}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee closed pull request #13289: [TESTING] (DNR)testing failing test from another PR

2023-02-22 Thread via GitHub


philipnee closed pull request #13289: [TESTING] (DNR)testing failing test from 
another PR
URL: https://github.com/apache/kafka/pull/13289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13281: [MINOR] Adjust logging with ZK log format

2023-02-22 Thread via GitHub


showuon merged PR #13281:
URL: https://github.com/apache/kafka/pull/13281


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size

2023-02-22 Thread via GitHub


showuon merged PR #13261:
URL: https://github.com/apache/kafka/pull/13261


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size

2023-02-22 Thread via GitHub


showuon commented on PR #13261:
URL: https://github.com/apache/kafka/pull/13261#issuecomment-1441133511

   Failed tests are unrelated
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
   Build / JDK 8 and Scala 2.12 / 
kafka.admin.ReassignPartitionsIntegrationTest.testReassignment(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.KafkaServerKRaftRegistrationTest.[1] Type=ZK, 
Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size

2023-02-22 Thread via GitHub


bachmanity1 commented on PR #13261:
URL: https://github.com/apache/kafka/pull/13261#issuecomment-1441126265

   The CI test failures look unrelated to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2023-02-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-5863:
--

Assignee: Greg Harris

> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Assignee: Greg Harris
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #13294: KAFKA-5863: Avoid NPE when calls expecting no-content receive content.

2023-02-22 Thread via GitHub


gharris1727 opened a new pull request, #13294:
URL: https://github.com/apache/kafka/pull/13294

   The RestClient accepts a TypeReference argument defining what kind of 
response to expect from the HTTP request.
   If these request is expected to result in a 204 no-content, they would 
previously not provide a TypeReference argument (null). If a request which 
normally resulted in a 204 returned some other 200 code, then the RestClient 
would experience an NPE when it tries to deserialize the nonempty response.
   
   Instead, we should enforce that the TypeReference is always provided to the 
RestClient. If we provide a TypeReference for the calls which do not 
expect to return content, then if they do receive content, that content will be 
silently dropped instead of causing an NPE.
   
   Adds new tests for enforcing non-null arguments, and confirms the behavior 
of a TypeReference call-site receiving a non-empty response.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-22 Thread via GitHub


RivenSun2 commented on PR #13270:
URL: https://github.com/apache/kafka/pull/13270#issuecomment-1441096550

   Hi @guozhangwang 
   could you give any suggestions?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-22 Thread via GitHub


jolshan commented on PR #13231:
URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441095454

   Took a quick look at the unstable api change. Looks like some integration 
tests built specifically for v4 fail with 
`org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled` 
   
   I will need to look into this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #13293: KAFKA-14365: Extract common logic from Fetcher into FetcherUtils

2023-02-22 Thread via GitHub


kirktrue opened a new pull request, #13293:
URL: https://github.com/apache/kafka/pull/13293

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-22 Thread via GitHub


ijuma commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1441082213

   @dejan2609 Good catch. I fixed it in the same branch. And tested with Scala 
2.12 and 2.13.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-22 Thread via GitHub


jolshan commented on PR #13231:
URL: https://github.com/apache/kafka/pull/13231#issuecomment-1441071519

   I've added the changes to the API spec
   -- verify only is now a transaction level config
   -- top level error is added to the response
   
   I've added builders to the request and tried to simplify some of the methods.
   I've also addressed the verifyOnly case where some partitions are in the txn 
and others are not. 
   
   I will update the KIP to reflect some of these changes (especially with 
respect to the API spec)
   
   I still need to address the unstable API change, but that will require a 
pull from master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-5827) Allow configuring Kafka sink connectors to start processing records from the end of topics

2023-02-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-5827.

Resolution: Duplicate

> Allow configuring Kafka sink connectors to start processing records from the 
> end of topics
> --
>
> Key: KAFKA-5827
> URL: https://issues.apache.org/jira/browse/KAFKA-5827
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Behrang Saeedzadeh
>Priority: Major
>
> As far as I can see, Kafka connectors start exporting data of a topic from 
> the beginning of its partitions. We have a topic that contains a few million 
> old records that we don't need but we would like to start exporting new 
> records that are added to the topic.
> Basically:
> * When the connector is started for the first time and it does not have a 
> current offset stored, it should start consuming data from the end of topic 
> partitions
> * When the connector is restarted and has a current offset for partitions 
> stored somewhere, it should start from those offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-5827) Allow configuring Kafka sink connectors to start processing records from the end of topics

2023-02-22 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692413#comment-17692413
 ] 

Greg Harris commented on KAFKA-5827:


This is controllable via the Client Override feature KIP-458 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 .
You can configure the `consumer.override.auto.offset.reset` configuration 
property in a connector configuration to have the consumer begin reading from 
the latest record in a partition. After the connector commits offsets, further 
restarts will pick up where the previous commit finished, avoiding data loss 
while not re-reading previously committed messages.

> Allow configuring Kafka sink connectors to start processing records from the 
> end of topics
> --
>
> Key: KAFKA-5827
> URL: https://issues.apache.org/jira/browse/KAFKA-5827
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Behrang Saeedzadeh
>Priority: Major
>
> As far as I can see, Kafka connectors start exporting data of a topic from 
> the beginning of its partitions. We have a topic that contains a few million 
> old records that we don't need but we would like to start exporting new 
> records that are added to the topic.
> Basically:
> * When the connector is started for the first time and it does not have a 
> current offset stored, it should start consuming data from the end of topic 
> partitions
> * When the connector is restarted and has a current offset for partitions 
> stored somewhere, it should start from those offsets



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia opened a new pull request, #13292: KAFKA-14491: [14/N] Set changelog topic configs for versioned stores

2023-02-22 Thread via GitHub


vcrfxia opened a new pull request, #13292:
URL: https://github.com/apache/kafka/pull/13292

   (This PR is stacked on https://github.com/apache/kafka/pull/13274. Only the 
last commit needs to be reviewed separately.)
   
   This PR sets the correct topic configs for changelog topics for versioned 
stores introduced in 
[KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores).
 Changelog topics for versioned stores differ from those for non-versioned 
stores only in that `min.compaction.lag.ms` needs to be set in order to prevent 
version history from being compacted prematurely. 
   
   The value for `min.compaction.lag.ms` is equal to the store's history 
retention plus some buffer to account for the broker's use of wall-clock time 
in performing compactions. This buffer is analogous to the 
`windowstore.changelog.additional.retention.ms` value for window store 
changelog topic retention time, and uses the same default of 24 hours. In the 
future, we can propose a KIP to expose a config such as 
`versionedstore.changelog.additional.compaction.lag.ms` to allow users to tune 
this value.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values

2023-02-22 Thread via GitHub


kowshik commented on PR #13268:
URL: https://github.com/apache/kafka/pull/13268#issuecomment-1440982486

   @junrao Thanks for the review. I have rebased the PR and brought in the 
latest commits from AK trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115068319


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {
+return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+}
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set subscription() {
+  

[jira] [Resolved] (KAFKA-4006) Kafka connect fails sometime with InvalidTopicException in distributed mode

2023-02-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-4006.

Fix Version/s: 0.11.0.0
   Resolution: Fixed

> Kafka connect fails sometime with InvalidTopicException in distributed mode
> ---
>
> Key: KAFKA-4006
> URL: https://issues.apache.org/jira/browse/KAFKA-4006
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Sumit Arrawatia
>Priority: Major
> Fix For: 0.11.0.0
>
>
> I get trying to spin up a 3 node distributed connect cluster.
> Sometimes one of the worker fails to boot with the following error when auto 
> topic creation is enabled : 
> org.apache.kafka.common.errors.InvalidTopicException: Topic 'default.config' 
> is invalid 
> default.config is the topic name for Connect config. 
> Also, starting the worker again fixes the issue. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115056524


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {

Review Comment:
   Oh my mistake, entirely forgot about there's no rebalancing after the 
assignment().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [kafka] guozhangwang commented on pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


guozhangwang commented on PR #13265:
URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440933102

   Thanks @philipnee , I left a comment to just give my 2c regarding your 
question 1) above.
   
   For your question 2), I think we should prefer to be consistent with the 
current guarantees if it's already stated in the public APIs unless there's 
good rationale to change it (in which case we'd need very vocally communicate 
it as a breaking change in the release). For example:
   
   1) We documented that rebalance listener callbacks are only related to 
`subscribe` scenarios, and would only be triggered within `poll()`, and only at 
which time the assignment could be changed.
   2) We also documented that the async commit listener callbacks are only 
triggered within `poll()`.
   3) In our javadocs, the CommitFailedException would only be thrown from the 
`commitSync` functions.
   
   Putting all those together, I think it means `poll()` call should drain the 
queues since if there's any events requiring any callbacks to be triggered, 
they should be triggered in that call; 
   
   For `commitSync`, technically it would need to wait for the corresponding 
commit response event, but since there's only a single queue, it means we would 
still need to keep polling that queue until the event is received.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115047665


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {

Review Comment:
   In the `assign()` -> `assignment()` example above, there is no "rebalance" 
triggered and hence there's no rebalance listener triggered either. The 
rebalance listener is only relevant with the `subscribe()` scenarios.
   
   So just to summarize all the 

[jira] [Comment Edited] (KAFKA-14722) Make BooleanSerde public

2023-02-22 Thread Spacrocket (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692398#comment-17692398
 ] 

Spacrocket edited comment on KAFKA-14722 at 2/22/23 10:32 PM:
--

Thanks Matthias for an advice; I've added the proposal yesterday and it's in 
[DISCUSS] stage. I think I will wait till Friday and then start the VOTING 
stage.
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]


was (Author: JIRAUSER289157):
Thanks Matthias J. Sax for an advice; I've added the proposal yesterday and 
it's in [DISCUSS] stage. I think I will wait till Friday and then start the 
VOTING stage.
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14722) Make BooleanSerde public

2023-02-22 Thread Spacrocket (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692398#comment-17692398
 ] 

Spacrocket commented on KAFKA-14722:


Thanks Matthias J. Sax for an advice; I've added the proposal yesterday and 
it's in [DISCUSS] stage. I think I will wait till Friday and then start the 
VOTING stage.
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1115037535


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {
+return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+}
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set subscription() {

[jira] [Assigned] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2023-02-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-9228:
--

Assignee: Greg Harris

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Greg Harris
>Priority: Major
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-22 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440891478

   > @dejan2609 I added a few cleanups here: 
[ijuma@9c6ae57](https://github.com/ijuma/kafka/commit/9c6ae575276e673f39d188d42a332f9f0b07d2d0)
   > 
   > If you agree with them, please integrate into your PR.
   
   @ijuma Scala 2.13 builds are fine, but something is still missing for Scala 
2.12 (all **_./gradlew -PscalaVersion=2.12 clean jar_** builds are failling):
   - JDK 17
   ```
   > Task :core:compileScala
   '-release' does not accept multiple arguments
   bad option: '-release:8'
   
   > Task :core:compileScala FAILED
   
   > Task :streams:streams-scala:compileScala FAILED
   'strict-unsealed-patmat' is not a valid choice for '-Xlint'
   '-release' does not accept multiple arguments
   bad option: '-release:8'
   
   FAILURE: Build completed with 2 failures.
   ```
   - JDK 11
   ```
   > Task :core:compileScala FAILED
   
   > Task :streams:streams-scala:compileScala
   'strict-unsealed-patmat' is not a valid choice for '-Xlint'
   '-release' does not accept multiple arguments
   bad option: '-release:8'
   
   > Task :streams:streams-scala:compileScala FAILED
   
   FAILURE: Build completed with 2 failures.
   ```
   - JDK 8
   ```
   > Task :core:compileScala FAILED
   '-release' does not accept multiple arguments
   bad option: '-release:8'
   
   FAILURE: Build failed with an exception.
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-22 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link 
Serde}s
+ * to convert from K,ValueAndTimestampV to 
Bytes,byte[]. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(

Review Comment:
   Let me try to repeat back your suggestion to make 

[GitHub] [kafka] junrao merged pull request #13272: MINOR: Add missing unit tests for {Local|Remote}LeaderEndpoint classes

2023-02-22 Thread via GitHub


junrao merged PR #13272:
URL: https://github.com/apache/kafka/pull/13272


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-22 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440804226

   Got it @ijuma 
   
   I will also add  jacoco minor version bump: 0.8.7 -->> 0.8.8 in order to use 
same version as in Gradle 8.0.1:
   
https://docs.gradle.org/8.0.1/userguide/jacoco_plugin.html#sec:configuring_the_jacoco_plugin
   
   Will squash/rebase on trunk and force-push.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {

Review Comment:
   I feel we need an explicit synchronization barrier to ensure state changes 
happen in a deterministic sequence.  I mentioned this idea in our previous 
meeting, but I just want to bring it up again for discussion: (feel free to 
strike down this idea)
   
   Is it 

[GitHub] [kafka] philipnee commented on pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on PR #13265:
URL: https://github.com/apache/kafka/pull/13265#issuecomment-1440786040

   I feel the real challenge here is to determine the timing to synchronize the 
background and client state. I feel we should do that deterministically 
otherwise, it is hard to provide a contract to the user. Anyways here are a few 
of my notes:
   
   1. What should the user expect: I think these are a few scenarios
- `subscribe()` -> `subscriptions()`
- `assign()` -> `assignments()`
- `seek()` -> `position()`
- `seek()` -> poll() (something returned) -> `position()`
   2. Do we all agree to only drain the queue in `poll()` ? There are a few 
things to consider
- When and where to throw the CommitFailureException() (I think now it can 
happen everywhere when user invoke APIs)
- When and where to invoke the callbacks (commit and rebalance)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114930390


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {

Review Comment:
   Afterall, I think it is a worse user experience if the assignment call 
cannot yield deterministic results. Even though we will need a KIP for this 
behavior change, I think it kind of provides the guarantee to the user.



-- 
This is an automated message from the 

[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-22 Thread via GitHub


ijuma commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1440767080

   @dejan2609 I added a few cleanups here: 
https://github.com/ijuma/kafka/commit/9c6ae575276e673f39d188d42a332f9f0b07d2d0
   
   If you agree with them, please integrate into your PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114928370


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {

Review Comment:
   I feel we need an explicit synchronization barrier to ensure state changes 
happen in a deterministic sequence.  I mentioned this idea in our previous 
meeting, but I just want to bring it up again for discussion: (feel free to 
strike down this idea)
   
   Is it 

[jira] [Updated] (KAFKA-14742) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs

2023-02-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14742:

Labels: flaky-test  (was: )

> Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs
> -
>
> Key: KAFKA-14742
> URL: https://issues.apache.org/jira/browse/KAFKA-14742
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>  Labels: flaky-test
>
> The ExactlyOnceSourceIntegrationTest appears to occasionally throw the 
> following exception in my local test runs:
> {noformat}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.base/java.util.HashMap.newNode(HashMap.java:1901)
>   at java.base/java.util.HashMap.putVal(HashMap.java:629)
>   at java.base/java.util.HashMap.put(HashMap.java:610)
>   at java.base/java.util.HashSet.add(HashSet.java:221)
>   at 
> java.base/java.util.stream.Collectors$$Lambda$6/0x80011.accept(Unknown 
> Source)
>   at 
> java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.base/java.util.stream.LongPipeline$1$1.accept(LongPipeline.java:177)
>   at 
> java.base/java.util.stream.Streams$RangeLongSpliterator.forEachRemaining(Streams.java:228)
>   at 
> java.base/java.util.Spliterator$OfLong.forEachRemaining(Spliterator.java:775)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>   at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.lambda$assertSeqnos$9(ExactlyOnceSourceIntegrationTest.java:964)
>   at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$$Lambda$2500/0x0008015a1908.accept(Unknown
>  Source)
>   at java.base/java.util.HashMap.forEach(HashMap.java:1421)
>   at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertSeqnos(ExactlyOnceSourceIntegrationTest.java:961)
>   at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertExactlyOnceSeqnos(ExactlyOnceSourceIntegrationTest.java:939)
>   at 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testIntervalBoundary(ExactlyOnceSourceIntegrationTest.java:358)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100){noformat}
> It appears that the data produced by the connectors under test is too large 
> to be asserted on with the current assertions' memory overhead. We should try 
> to optimize the assertions' overhead and or reduce the number of records 
> being asserted on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-22 Thread via GitHub


gharris1727 opened a new pull request, #13291:
URL: https://github.com/apache/kafka/pull/13291

   On my local machine, testIntervalBoundary is asserting on nearly 2.5 million 
records, when it appears that the test is written to need only 100-1000 records 
to perform assertions. This causes OOMEs in the test assertions which iterate 
over the set of records and perform memory allocations.
   
   I looked into reducing the assertion's memory overhead, but it didn't seem 
practical as even the smallest allocations appeared to exceed the memory limit.
   
   Instead, I configured the pre-existing throttle mechanism inside the 
MonitorableSourceConnector, so that tests now seem to produce ~90k records on 
my machine, leaving adequate spare memory for the existing assertions to pass 
without issue.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14742) Flaky ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs

2023-02-22 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14742:
---

 Summary: Flaky 
ExactlyOnceSourceIntegrationTest.testConnectorBoundary OOMs
 Key: KAFKA-14742
 URL: https://issues.apache.org/jira/browse/KAFKA-14742
 Project: Kafka
  Issue Type: Improvement
Reporter: Greg Harris
Assignee: Greg Harris


The ExactlyOnceSourceIntegrationTest appears to occasionally throw the 
following exception in my local test runs:
{noformat}
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.base/java.util.HashMap.newNode(HashMap.java:1901)
at java.base/java.util.HashMap.putVal(HashMap.java:629)
at java.base/java.util.HashMap.put(HashMap.java:610)
at java.base/java.util.HashSet.add(HashSet.java:221)
at 
java.base/java.util.stream.Collectors$$Lambda$6/0x80011.accept(Unknown 
Source)
at 
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.base/java.util.stream.LongPipeline$1$1.accept(LongPipeline.java:177)
at 
java.base/java.util.stream.Streams$RangeLongSpliterator.forEachRemaining(Streams.java:228)
at 
java.base/java.util.Spliterator$OfLong.forEachRemaining(Spliterator.java:775)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.lambda$assertSeqnos$9(ExactlyOnceSourceIntegrationTest.java:964)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$$Lambda$2500/0x0008015a1908.accept(Unknown
 Source)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertSeqnos(ExactlyOnceSourceIntegrationTest.java:961)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.assertExactlyOnceSeqnos(ExactlyOnceSourceIntegrationTest.java:939)
at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testIntervalBoundary(ExactlyOnceSourceIntegrationTest.java:358)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100){noformat}
It appears that the data produced by the connectors under test is too large to 
be asserted on with the current assertions' memory overhead. We should try to 
optimize the assertions' overhead and or reduce the number of records being 
asserted on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 opened a new pull request, #13290: MINOR: stabilize LeaderElectionTest#testLeaderElectionAndEpoch

2023-02-22 Thread via GitHub


chia7712 opened a new pull request, #13290:
URL: https://github.com/apache/kafka/pull/13290

   the leader is changed from -1 to 1. The test get failed if it observes the 
-1 first.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


philipnee commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114851962


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {
+return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+}
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set subscription() {
+  

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-22 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114835088


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link 
Serde}s
+ * to convert from K,ValueAndTimestampV to 
Bytes,byte[]. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.

Review Comment:
   Thanks for the suggestions! Incorporated this into the latest commit. I used 
`KeyValueStore` in the comments instead of `TimestampedKeyValueStore` (pending 
our other discussion about whether `MeteredVersionedKeyValueStore` is 
conceptually `MeteredTimestampedKeyValueStore` or `MeteredKeyValueStore`) and 
also modified the last line since I wasn't sure which `get()` override you were 
referring to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #12957: MINOR: Fix flaky testClientDisconnectionUpdatesRequestMetrics() (#11987)

2023-02-22 Thread via GitHub


jolshan merged PR #12957:
URL: https://github.com/apache/kafka/pull/12957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-22 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114799670


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link 
Serde}s
+ * to convert from K,ValueAndTimestampV to 
Bytes,byte[]. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(
+key,
+// versioned stores require a 

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13265: Prototype consumer stubs

2023-02-22 Thread via GitHub


guozhangwang commented on code in PR #13265:
URL: https://github.com/apache/kafka/pull/13265#discussion_r1114721314


##
clients/src/main/java/org/apache/kafka/clients/consumer/StubbedAsyncKafkaConsumer.java:
##
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.Fetcher;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignPartitionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitAsyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitSyncEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchRecordsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.RequestRebalanceEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscribePatternEvent;
+import org.apache.kafka.clients.consumer.internals.events.SubscribeTopicsEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * This {@link StubbedAsyncKafkaConsumer stub} implementation of {@link 
Consumer} serves as a wireframe or
+ * sketch, showing the interaction between the foreground and 
background threads. Each of the main API methods
+ * will need to answer the following questions:
+ *
+ * 
+ * Does this method block?
+ * Does this method interact with the background thread?
+ * If yes, what data is passed as input to the background thread?
+ * If yes, what data is returned as output from the background 
thread?
+ * 
+ *
+ * @param  Key
+ * @param  Value
+ * @see ApplicationEventProcessor for the logic of the background event handler
+ */
+public class StubbedAsyncKafkaConsumer implements Consumer {
+
+/**
+ * These instance variables are intentionally left unassigned, to avoid 
clutter...
+ */
+private Time time;
+
+private EventHandler eventHandler;
+
+private SubscriptionState subscriptions;
+
+private Deserializer keyDeserializer;
+
+private Deserializer valueDeserializer;
+
+private long defaultApiTimeoutMs;
+
+private List assignors;
+
+private Optional groupId;
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set assignment() {
+return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+}
+
+/**
+ * Answers to the above questions:
+ *
+ * 
+ * No
+ * No
+ * n/a
+ * n/a
+ * 
+ */
+@Override
+public Set subscription() {

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-22 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114772380


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link 
Serde}s
+ * to convert from K,ValueAndTimestampV to 
Bytes,byte[]. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(

Review Comment:
   Let me try to repeat back your suggestion to make 

[GitHub] [kafka] philipnee opened a new pull request, #13289: [TESTING] testing failing test from another PR

2023-02-22 Thread via GitHub


philipnee opened a new pull request, #13289:
URL: https://github.com/apache/kafka/pull/13289

   Don't review this PR! I'm just testing why a bunch of irrelevant tests are 
failing on the other branch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-22 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1114727896


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type Bytes,byte[], so we use {@link 
Serde}s
+ * to convert from K,ValueAndTimestampV to 
Bytes,byte[]. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {

Review Comment:
   `MeteredVersionedKeyValueStoreInternal` and `MeteredTimestampKeyValueStore` 
use different serdes -- `MeteredTimestampKeyValueStore` uses 
`ValueAndTimestampSerde` while `MeteredVersionedKeyValueStoreInternal` uses 
`NullableValueAndTimestampSerde`. 
   
   If you look at the code for `MeteredTimestampKeyValueStore`, the only method 
it overrides from `MeteredKeyValueStore` is `prepareValueSerdeForStore()`. We 
could have `MeteredVersionedKeyValueStoreInternal` extend 
`MeteredTimestampKeyValueStore` instead of `MeteredKeyValueStore` if we want 
but `MeteredVersionedKeyValueStoreInternal` needs to override 
`prepareValueSerdeForStore()` with its own serde anyway, so 

[GitHub] [kafka] philipnee commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-22 Thread via GitHub


philipnee commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440514286

   Hmm, strangely, this branch seems to trigger a bunch of initializing error 
failures. And I can't seem to reproduce them locally...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14714) Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-22 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692294#comment-17692294
 ] 

Satish Duggana commented on KAFKA-14714:


https://github.com/apache/kafka/pull/13255

> Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
> -
>
> Key: KAFKA-14714
> URL: https://issues.apache.org/jira/browse/KAFKA-14714
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd merged pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-22 Thread via GitHub


satishd merged PR #13255:
URL: https://github.com/apache/kafka/pull/13255


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-22 Thread via GitHub


guozhangwang commented on PR #13190:
URL: https://github.com/apache/kafka/pull/13190#issuecomment-1440489302

   Yeah I think it's okay to make the rule consistent, i.e. to honor the 
timeout even under those four exceptions: if the timer has elapsed, then we 
should well return from the loop in
   
   ```
   client.poll(future, timer);
   if (!future.isDone()) {
   // we ran out of time
   return false;
   }
   ```
   
   even if the response yet to be returned would contain any of these four 
exceptions. So I think we should still obey this rule, i.e. even if a response 
has been returned and we know it's going to be one of these four exceptions, if 
the timer has elapsed, we still exit the loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-22 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114692109


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -38,15 +40,21 @@
  */
 public final class TopicsImage {
 public static final TopicsImage EMPTY =
-new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+new TopicsImage(map(), map());
+
+final ImMap topicsById;
+final ImMap topicsByName;
 
-private final Map topicsById;
-private final Map topicsByName;
+public TopicsImage(ImMap topicsById,
+   ImMap topicsByName) {
+this.topicsById = topicsById;
+this.topicsByName = topicsByName;
+}
 
-public TopicsImage(Map topicsById,
-   Map topicsByName) {
-this.topicsById = Collections.unmodifiableMap(topicsById);
-this.topicsByName = Collections.unmodifiableMap(topicsByName);
+public TopicsImage including(TopicImage topic) {

Review Comment:
   It is used from a test class in the `core` module so it needs to be public 
:-(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-22 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114679640


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
 }
 
 public void write(ImageWriter writer, ImageWriterOptions options) {
-for (TopicImage topicImage : topicsById.values()) {
-topicImage.write(writer, options);
+for (Map.Entry entry : topicsById.entrySet()) {
+entry.getValue().write(writer, options);

Review Comment:
   We could, but it is marked deprecated in the library because there is no way 
to provide a reasonable `.equals()` method.  I actually checked, and indeed it 
is true:
   ```
   @Test
   public void testMapValuesEquality() {
   Map m = new HashMap<>();
   m.put("a", "a");
   m.put("b", "b");
   assertEquals(m.keySet(), new HashSet<>(Arrays.asList("a", "b")));
   assertEquals(m.keySet(), new HashSet<>(Arrays.asList("b", "a")));
   // note that these all assert inequality
   assertNotEquals(m.values(), new HashSet<>(Arrays.asList("a", "b")));
   assertNotEquals(m.values(), Arrays.asList("a", "b"));
   assertNotEquals(m.values(), new HashSet<>(Arrays.asList("b", "a")));
   assertNotEquals(m.values(), Arrays.asList("b", "a"));
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-22 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114673066


##
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion 
newVersion) {
 }
 
 public TopicsImage apply() {
-Map newTopicsById = new 
HashMap<>(image.topicsById().size());
-Map newTopicsByName = new 
HashMap<>(image.topicsByName().size());
-for (Entry entry : image.topicsById().entrySet()) {
-Uuid id = entry.getKey();
-TopicImage prevTopicImage = entry.getValue();
-TopicDelta delta = changedTopics.get(id);
-if (delta == null) {
-if (!deletedTopicIds.contains(id)) {
-newTopicsById.put(id, prevTopicImage);
-newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-}
+ImMap newTopicsById = image.topicsById;
+ImMap newTopicsByName = image.topicsByName;
+// apply all the deletes
+for (Uuid topicId: deletedTopicIds) {
+// it was deleted, so we have to remove it from the maps
+TopicImage originalTopicToBeDeleted = 
image.topicsById.get(topicId);
+if (originalTopicToBeDeleted == null) {
+throw new IllegalStateException("Missing topic id " + topicId);
 } else {
-TopicImage newTopicImage = delta.apply();
-newTopicsById.put(id, newTopicImage);
-newTopicsByName.put(delta.name(), newTopicImage);
+newTopicsById = newTopicsById.without(topicId);
+newTopicsByName = 
newTopicsByName.without(originalTopicToBeDeleted.name());
 }
 }
-for (Entry entry : changedTopics.entrySet()) {
-if (!newTopicsById.containsKey(entry.getKey())) {
-TopicImage newTopicImage = entry.getValue().apply();
-newTopicsById.put(newTopicImage.id(), newTopicImage);
-newTopicsByName.put(newTopicImage.name(), newTopicImage);
-}
+// apply all the updates/additions
+for (Map.Entry entry: changedTopics.entrySet()) {
+Uuid topicId = entry.getKey();
+TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+// put new information into the maps
+String topicName = newTopicToBeAddedOrUpdated.name();
+newTopicsById = newTopicsById.assoc(topicId, 
newTopicToBeAddedOrUpdated);
+newTopicsByName = newTopicsByName.assoc(topicName, 
newTopicToBeAddedOrUpdated);

Review Comment:
   Persistent data structures are immutable, so they always create and return a 
new data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

2023-02-22 Thread via GitHub


guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660400


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -151,9 +202,18 @@ private void resumeTasks() {
 }
 }
 
-private void restoreTasks() {
+private void pauseTasks() {
+for (final Task task : updatingTasks.values()) {

Review Comment:
   I think the perf impact should be small since pause/resume are not commonly 
used, and if the named topology are not paused, then checking the status is 
just a few cpu cycles.
   
   Another motivation is that if we remove named topologies, than pausing / 
resuming would always be impact on all tasks, in which case we could have a 
simpler check (e.g. just check a single flag) which would be even cheaper. So 
it's probably better to maintain the code layout in this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13212: MINOR: Remove accidental unnecessary code; fix comment references

2023-02-22 Thread via GitHub


clolov commented on PR #13212:
URL: https://github.com/apache/kafka/pull/13212#issuecomment-1440432643

   This has been rebased on the latest trunk, test have been ran locally and I 
would be very grateful for a review when you get the time @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13288: MINOR: fix rerun-tests for unit test

2023-02-22 Thread via GitHub


chia7712 opened a new pull request, #13288:
URL: https://github.com/apache/kafka/pull/13288

   related to #11926. we don't process `rerun-tests` for unit test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

2023-02-22 Thread via GitHub


guozhangwang commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114660189


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -598,6 +685,12 @@ public Set getUpdatingStandbyTasks() {
 : Collections.emptySet();
 }
 
+public Set getUpdatingActiveTasks() {
+return stateUpdaterThread != null

Review Comment:
   That's a good point, will update.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -151,9 +202,18 @@ private void resumeTasks() {
 }
 }
 
-private void restoreTasks() {
+private void pauseTasks() {
+for (final Task task : updatingTasks.values()) {

Review Comment:
   I think the perf impact should be small since pause/resume are not commonly 
used, and if the named topology are not paused, then checking the status is 
just a few cpu cycles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #11926: KAFKA-13714: Fix cache flush position

2023-02-22 Thread via GitHub


chia7712 commented on code in PR #11926:
URL: https://github.com/apache/kafka/pull/11926#discussion_r1114653978


##
build.gradle:
##
@@ -435,6 +435,12 @@ subprojects {
   maxRetries = userMaxTestRetries
   maxFailures = userMaxTestRetryFailures
 }
+
+// Allows devs to run tests in a loop to debug flaky tests
+// Eg: I=0; while ./gradlew :streams:test -Prerun-tests --tests 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do 
(( I=$I+1 )); echo
+if (project.hasProperty("rerun-tests")) {
+  outputs.upToDateWhen { false }
+}

Review Comment:
   > which causes us to recompile the tests on every iteration.
   
   just curious. on my local, `cleanTest` does not invoke recompile. The 
following console output is produced by command `./gradlew cleanTest 
clients:test --tests RequestResponseTest --info`
   ```
   > Task :clients:compileTestJava UP-TO-DATE
   Caching disabled for task ':clients:compileTestJava' because:
 Build cache is disabled
   Skipping task ':clients:compileTestJava' as it is up-to-date.
   Resolve mutations for :clients:testClasses (Thread[Execution worker Thread 
6,5,main]) started.
   :clients:testClasses (Thread[Execution worker Thread 6,5,main]) started.
   
   > Task :clients:testClasses UP-TO-DATE
   Skipping task ':clients:testClasses' as it has no actions.
   Resolve mutations for :clients:checkstyleTest (Thread[Execution worker 
Thread 6,5,main]) started.
   :clients:checkstyleTest (Thread[Execution worker Thread 6,5,main]) started.
   ```
   Could you share the command which causes the recompile to me? thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-22 Thread via GitHub


satishd commented on PR #13255:
URL: https://github.com/apache/kafka/pull/13255#issuecomment-1440419578

   A couple of test failures are not related to this change, will merge the 
changes to trunk. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13259: MINOR: Simplify JUnit assertions in src; remove accidental unnecessary code in src

2023-02-22 Thread via GitHub


mimaison commented on PR #13259:
URL: https://github.com/apache/kafka/pull/13259#issuecomment-1440413003

   I checked the clients and connect changes and they look good. I think some 
of the streams changes could be debatable (not sure if they want to keep some 
of the comments) so I'll let someone working on streams review them. 
@ableegoldman @guozhangwang can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-02-22 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14740:
--

Assignee: Mickael Maison

> Missing source tag on MirrorSource metrics
> --
>
> Key: KAFKA-14740
> URL: https://issues.apache.org/jira/browse/KAFKA-14740
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> The metrics defined in MirrorSourceMetrics have the following tags "target", 
> "topic", "partition". It would be good to also have a "source" tag with the 
> source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14741) Add description field to connector configs

2023-02-22 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14741:
--

 Summary: Add description field to connector configs
 Key: KAFKA-14741
 URL: https://issues.apache.org/jira/browse/KAFKA-14741
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Mickael Maison
Assignee: Mickael Maison


Connectors are identified by their name. In many cases it would be useful to 
attach a description/comment to connectors to provide some context. This would 
be especially useful on Connect clusters running several connectors and/or 
shared by multiple teams. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-22 Thread via GitHub


Schm1tz1 commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1114586256


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   @viktorsomogyi sure! The feature is already implemented and unit tests 
added, just not committed yet as I am not 100% sure about the global config 
integration (static Strings for the properties, where to reference, additional 
interfaces to use...). The other providers example do not have any config 
methods implemented. Happy to learn how to include that if there are any 
references / best practices you can share.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-02-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692262#comment-17692262
 ] 

Mickael Maison commented on KAFKA-14740:


Thanks for the quick reply. I think it would make dealing metrics a little bit 
easier when you have a bunch of mirroring routes between multiple clusters. 
I'll draft a small KIP.

> Missing source tag on MirrorSource metrics
> --
>
> Key: KAFKA-14740
> URL: https://issues.apache.org/jira/browse/KAFKA-14740
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>
> The metrics defined in MirrorSourceMetrics have the following tags "target", 
> "topic", "partition". It would be good to also have a "source" tag with the 
> source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13259: MINOR: Simplify JUnit assertions in src; remove accidental unnecessary code in src

2023-02-22 Thread via GitHub


clolov commented on PR #13259:
URL: https://github.com/apache/kafka/pull/13259#issuecomment-1440295776

   Hello @mimaison, would you have the time to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-02-22 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692252#comment-17692252
 ] 

Ryanne Dolan commented on KAFKA-14740:
--

[~mimaison] the topic name usually includes the source cluster already, so I 
figured it was redundant. With identity replication you don't get that, but you 
presumably know what the source is in such cases. I don't have any objections 
to adding it tho.

> Missing source tag on MirrorSource metrics
> --
>
> Key: KAFKA-14740
> URL: https://issues.apache.org/jira/browse/KAFKA-14740
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>
> The metrics defined in MirrorSourceMetrics have the following tags "target", 
> "topic", "partition". It would be good to also have a "source" tag with the 
> source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-02-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692248#comment-17692248
 ] 

Mickael Maison commented on KAFKA-14740:


[~ryannedolan] Do you remember if there was a reason not to include the source 
tag on these metrics? MirrorCheckpointMetrics have the source tag.

> Missing source tag on MirrorSource metrics
> --
>
> Key: KAFKA-14740
> URL: https://issues.apache.org/jira/browse/KAFKA-14740
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>
> The metrics defined in MirrorSourceMetrics have the following tags "target", 
> "topic", "partition". It would be good to also have a "source" tag with the 
> source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14740) Missing source tag on MirrorSource metrics

2023-02-22 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14740:
--

 Summary: Missing source tag on MirrorSource metrics
 Key: KAFKA-14740
 URL: https://issues.apache.org/jira/browse/KAFKA-14740
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Mickael Maison


The metrics defined in MirrorSourceMetrics have the following tags "target", 
"topic", "partition". It would be good to also have a "source" tag with the 
source cluster alias.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-22 Thread via GitHub


rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1440262838

   @showuon Thanks for the review.  Yes, I noticed the lack of Java 8 support 
as well.  I tried compiling the latest version of Paguro with Java 8 and it did 
not work (e.g. `[ERROR]   reason: '<>' with anonymous inner classes is not 
supported in -source 8`).
   
   
   I will change the PRs to use the Java-8 compatible Paguro 3.1.2 version 
instead of Paguro 3.10.3 for the moment while doing research for a replacement 
library that supports Java 8 and that extends the standard `java.util` 
interfaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14738) Topic disappears from kafka_topic.sh --list after modifying it with kafka_acl.sh

2023-02-22 Thread Gabriel Lukacs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692235#comment-17692235
 ] 

Gabriel Lukacs commented on KAFKA-14738:


ok, thanks for clarification, my fault, i was not familiar with acl/jaas, but 
now it is clear.

sorry for inconveniences, pls close this bug.

> Topic disappears from kafka_topic.sh --list after modifying it with 
> kafka_acl.sh
> 
>
> Key: KAFKA-14738
> URL: https://issues.apache.org/jira/browse/KAFKA-14738
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.3
>Reporter: Gabriel Lukacs
>Priority: Major
>
> Topic is not listed via kafka-topics.sh --list after modifying it with 
> kafka-acls.sh (-add --allow-principal User:CN=test --operation Read):
> $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 
> --topic test2 --replication-factor 1 --partitions 50
> Created topic test2.
> $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic 
> test2
> test2
> $ /opt/kafka/bin/kafka-acls.sh --bootstrap-server kafka:9092 --topic test2 
> --add --allow-principal User:CN=test --operation Read
> Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, 
> patternType=LITERAL)`:
>         (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW)
> Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, 
> patternType=LITERAL)`:
>         (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW)
> $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic 
> test2                                   
> $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 
> --topic test2
> Error while executing topic command : Topic 'test2' already exists.
> [2023-02-21 16:37:39,185] ERROR 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'test2' already 
> exists.
>  (kafka.admin.TopicCommand$)
> $ /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka:9092 
> --topic test2
> Error while executing topic command : Topic 'test2' does not exist as expected
> [2023-02-21 16:37:49,485] ERROR java.lang.IllegalArgumentException: Topic 
> 'test2' does not exist as expected
>         at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
>         at 
> kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361)
>         at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)
>         at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> $ /opt/kafka/bin/kafka-topics.sh --version
> 3.2.3 (Commit:50029d3ed8ba576f)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-22 Thread via GitHub


viktorsomogyi commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1114431334


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   @Schm1tz1 did you manage to look at @OneCricketeer 's comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13228: KAFKA-10199: Add task updater metrics, part 1

2023-02-22 Thread via GitHub


lucasbru commented on code in PR #13228:
URL: https://github.com/apache/kafka/pull/13228#discussion_r1114354933


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -151,9 +202,18 @@ private void resumeTasks() {
 }
 }
 
-private void restoreTasks() {
+private void pauseTasks() {
+for (final Task task : updatingTasks.values()) {

Review Comment:
   Not sure, but is there any performance concern around running this loop in 
every single iteration of `runOnce` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -598,6 +685,12 @@ public Set getUpdatingStandbyTasks() {
 : Collections.emptySet();
 }
 
+public Set getUpdatingActiveTasks() {
+return stateUpdaterThread != null

Review Comment:
   As I understand it, this function will be called quite frequently to export 
metrics. We only need the size of the collection. It could make sense to avoid 
the allocations here and just implement a `getNumberOfUpdaingActiveTasks` as a 
non-essential but free optimization. Similar for `getPausedStandbyTasks` etc. 
pp.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -399,31 +459,56 @@ private void addToRestoredTasks(final StreamTask task) {
 }
 }
 
-private void checkAllUpdatingTaskStates(final long now) {
+private void maybeCheckpointTasks(final long now) {
 final long elapsedMsSinceLastCommit = now - lastCommitMs;
 if (elapsedMsSinceLastCommit > commitIntervalMs) {
 if (log.isDebugEnabled()) {
 log.debug("Checking all restoring task states since {}ms 
has elapsed (commit interval is {}ms)",

Review Comment:
   Update the log message as well. This function isn't really checking task 
states anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge closed pull request #5545: MINOR: Fixed couple of warnings

2023-02-22 Thread via GitHub


tinaselenge closed pull request #5545: MINOR: Fixed couple of warnings
URL: https://github.com/apache/kafka/pull/5545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on pull request #5545: MINOR: Fixed couple of warnings

2023-02-22 Thread via GitHub


tinaselenge commented on PR #5545:
URL: https://github.com/apache/kafka/pull/5545#issuecomment-1439953834

   This tool is being migrated as part of 
https://issues.apache.org/jira/browse/KAFKA-14525. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-02-22 Thread via GitHub


cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1114238733


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -255,6 +255,7 @@ public void completeRestoration(final 
java.util.function.Consumerhttps://github.com/apache/kafka/blob/98c2f88e1c605195ccfac19c49a83216e26146a1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L492



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13216: Remove unused ZooKeeper log level configuration from `connect-log4j.properties`

2023-02-22 Thread via GitHub


mimaison merged PR #13216:
URL: https://github.com/apache/kafka/pull/13216


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14304) ZooKeeper to KRaft Migration

2023-02-22 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692106#comment-17692106
 ] 

Luke Chen commented on KAFKA-14304:
---

[~mumrah] , also, the fixed version of this epic is set to v3.4.1, is that 
correct? Shouldn't it be v3.5.0?

cc 3.5 release manager [~mimaison] 

 

 

> ZooKeeper to KRaft Migration
> 
>
> Key: KAFKA-14304
> URL: https://issues.apache.org/jira/browse/KAFKA-14304
> Project: Kafka
>  Issue Type: New Feature
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.4.1
>
>
> Top-level JIRA for 
> [KIP-866|https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-22 Thread via GitHub


showuon commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1439780222

   @rondagostino , also, it looks like this library doesn't support JDK 8? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-22 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun reassigned KAFKA-13771:


Assignee: RivenSun

> Support to explicitly delete delegationTokens that have expired but have not 
> been automatically cleaned up
> --
>
> Key: KAFKA-13771
> URL: https://issues.apache.org/jira/browse/KAFKA-13771
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Quoting the official documentation
> {quote}
> Tokens can also be cancelled explicitly. If a token is not renewed by the 
> token’s expiration time or if token is beyond the max life time, it will be 
> deleted from all broker caches as well as from zookeeper.
> {quote}
> 1. The first point above means that after the `AdminClient` initiates the 
> EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() 
> method on the KafkaServer side, if the user passes in expireLifeTimeMs less 
> than 0, KafaServer will delete the corresponding delegationToken directly.
> 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, 
> which is responsible for regularly cleaning up expired tokens. The execution 
> interval is `delegation.token.expiry.check.interval.ms`, and the default 
> value is one hour.
> But carefully analyze the code logic in DelegationTokenManager.expireToken(), 
> *now Kafka does not support users to delete an expired delegationToken that 
> he no longer uses/renew. If the user wants to do this, they will receive a 
> DelegationTokenExpiredException.*
> In the worst case, an expired delegationToken may still can be used normally 
> within {*}an hour{*}, even if this configuration 
> (delegation.token.expiry.check.interval.ms) broker can shorten the 
> configuration as much as possible.
> The solution is very simple, simply adjust the `if` order of 
> DelegationTokenManager.expireToken().
> {code:java}
> if (!allowedToRenew(principal, tokenInfo)) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
> } else if (expireLifeTimeMs < 0) { //expire immediately
>   removeToken(tokenInfo.tokenId)
>   info(s"Token expired for token: ${tokenInfo.tokenId} for owner: 
> ${tokenInfo.owner}")
>   expireResponseCallback(Errors.NONE, now)
> } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
> } else {
>   //set expiry time stamp
>  ..
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bachmanity1 commented on a diff in pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size

2023-02-22 Thread via GitHub


bachmanity1 commented on code in PR #13261:
URL: https://github.com/apache/kafka/pull/13261#discussion_r1114028635


##
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java:
##
@@ -688,8 +688,10 @@ public Object read(ByteBuffer buffer) {
 if (size > buffer.remaining())
 throw new SchemaException("Error reading bytes of size " + 
size + ", only " + buffer.remaining() + " bytes available");
 
+int limit = buffer.limit();
+buffer.limit(buffer.position() + size);

Review Comment:
   I introduced a new variable `newPosition`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org