[jira] [Assigned] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reassigned KAFKA-15160: -- Assignee: Phuc Hong Tran > Message bytes duplication in Kafka headers when compression is enabled > -- > > Key: KAFKA-15160 > URL: https://issues.apache.org/jira/browse/KAFKA-15160 > Project: Kafka > Issue Type: Bug > Components: clients, compression, consumer >Affects Versions: 3.2.3, 3.3.2 >Reporter: Vikash Mishra >Assignee: Phuc Hong Tran >Priority: Critical > Attachments: java heap dump.png, wireshark-min.png > > > I created a spring Kafka consumer using @KafkaListener. > During this, I encounter a scenario where when data is compressed ( any > compression snappy/gzip) and consumed by the consumer then I see that in a > heap dump, there is a " byte" occupying the same amount of memory as in > Message value. > This behavior is seen only in cases when compressed data is consumed by > consumers not in the case of uncompressed data. > Tried to capture Kafka's message through Wireshark, there it shows the proper > size of data incoming from Kafka server & no extra bytes in headers. So, this > is definitely something in Kafka client. Spring doesn't do any actions about > compression; the whole functionality is done internally in the Kafka client > library. > Attached is the screenshot of the heap dump and Wireshark. > This seems like a critical issue as message size in memory almost gets > doubles impacting consumer memory and performance. Somewhere it feels like > the actual message value is copied to headers? > *To Reproduce* > # Produce compressed data on any topic. > # Create a simple consumer consuming from the above-created topic. > # Capture heap dump. > *Expected behavior* > Headers should not show bytes consuming memory equivalent to value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Phuc-Hong-Tran closed pull request #14026: KAFKA-15152: Fix incorrect format specifiers when formatting string
Phuc-Hong-Tran closed pull request #14026: KAFKA-15152: Fix incorrect format specifiers when formatting string URL: https://github.com/apache/kafka/pull/14026 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on PR #13278: URL: https://github.com/apache/kafka/pull/13278#issuecomment-1641386848 Tests failures unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1641385324 Tests failures seems unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores
A. Sophie Blee-Goldman created KAFKA-15215: -- Summary: The default.dsl.store config is not compatible with custom state stores Key: KAFKA-15215 URL: https://issues.apache.org/jira/browse/KAFKA-15215 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman Assignee: Almog Gavra Sort of a bug, sort of a new/missing feature. When we added the long-awaited default.dsl.store config, it was decided to scope the initial KIP to just the two out-of-the-box state stores types offered by Streams, rocksdb and in-memory. The reason being that this would address a large number of the relevant use cases, and could always be followed up with another KIP for custom state stores if/when the demand arose. Of course, since rocksdb is the default anyways, the only beneficiaries of this KIP right now are the people who specifically want only in-memory stores – yet custom state stores users are probably by far the ones with the greatest need for an easier way to configure the store type across an entire application. And unfortunately, because the config currently relies on enum definitions for the known OOTB store types, there's not really any way to extend this feature as it is to work with custom implementations. I think this is a great feature, which is why I hope to see it extended to the broader user base. Most likely we'll want to introduce a new config for this, though whether it replaces the old default.dsl.store config or complements it will have to be decided during the KIP discussion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores
[ https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15215: --- Labels: needs-kip (was: ) > The default.dsl.store config is not compatible with custom state stores > --- > > Key: KAFKA-15215 > URL: https://issues.apache.org/jira/browse/KAFKA-15215 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: needs-kip > > Sort of a bug, sort of a new/missing feature. When we added the long-awaited > default.dsl.store config, it was decided to scope the initial KIP to just the > two out-of-the-box state stores types offered by Streams, rocksdb and > in-memory. The reason being that this would address a large number of the > relevant use cases, and could always be followed up with another KIP for > custom state stores if/when the demand arose. > Of course, since rocksdb is the default anyways, the only beneficiaries of > this KIP right now are the people who specifically want only in-memory stores > – yet custom state stores users are probably by far the ones with the > greatest need for an easier way to configure the store type across an entire > application. And unfortunately, because the config currently relies on enum > definitions for the known OOTB store types, there's not really any way to > extend this feature as it is to work with custom implementations. > I think this is a great feature, which is why I hope to see it extended to > the broader user base. Most likely we'll want to introduce a new config for > this, though whether it replaces the old default.dsl.store config or > complements it will have to be decided during the KIP discussion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on PR #13870: URL: https://github.com/apache/kafka/pull/13870#issuecomment-1641304041 the test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1267462476 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2643,9 +2652,175 @@ private CoordinatorResult updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } +return maybeCompleteJoinPhase(group); +} + +public CoordinatorResult genericGroupSync( +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); +Optional errorOpt = validateSyncGroup(group, request); +if (errorOpt.isPresent()) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(errorOpt.get().code())); + +} else if (group.isInState(EMPTY)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + +} else if (group.isInState(PREPARING_REBALANCE)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + +} else if (group.isInState(COMPLETING_REBALANCE)) { +group.member(memberId).setAwaitingSyncFuture(responseFuture); +removePendingSyncMember(group, request.memberId()); + +// If this is the leader, then we can attempt to persist state and transition to stable +if (group.isLeader(memberId)) { +log.info("Assignment received from leader {} for group {} for generation {}. " + +"The group has {} members, {} of which are static.", +memberId, groupId, group.generationId(), +group.size(), group.allStaticMemberIds().size()); + +// Fill all members with corresponding assignment. Reset members not specified in +// the assignment to empty assignments. +Map assignments = new HashMap<>(); +request.assignments() +.forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + +Set membersWithMissingAssignment = new HashSet<>(); +group.allMembers().forEach(member -> { +byte[] assignment = assignments.get(member.memberId()); +if (assignment != null) { +member.setAssignment(assignment); +} else { +membersWithMissingAssignment.add(member.memberId()); +member.setAssignment(new byte[0]); +} +}); + +if (!membersWithMissingAssignment.isEmpty()) { +log.warn("Setting empty assignments for members {} of {} for generation {}.", +membersWithMissingAssignment, groupId, group.generationId()); +} + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +// Another member may have joined the group while we were awaiting this callback, +// so we must ensure we are still in the CompletingRebalance state and the same generation +// when it gets invoked. if we have transitioned to another state, then do nothing +if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { +if (t != null) { +Errors error = Errors.forException(t); +resetAndPropagateAssignmentWithError(group, error); +maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + +"during SyncGroup (member: " + memberId + ")."); +} else { +// Members' assignments were already updated. Propagate and transition to Stable. +propagateAssignment(group, Errors.NONE); +group.transitionTo(STABLE); +} +} +}); + +List records = Collections.singletonList( +RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) +); +return new CoordinatorResult<>(records, appendFuture); +} + +} else if (group.isInState(STABLE)) { +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1267441961 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2643,9 +2652,175 @@ private CoordinatorResult updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } +return maybeCompleteJoinPhase(group); +} + +public CoordinatorResult genericGroupSync( +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); +Optional errorOpt = validateSyncGroup(group, request); +if (errorOpt.isPresent()) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(errorOpt.get().code())); + +} else if (group.isInState(EMPTY)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + +} else if (group.isInState(PREPARING_REBALANCE)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + +} else if (group.isInState(COMPLETING_REBALANCE)) { +group.member(memberId).setAwaitingSyncFuture(responseFuture); +removePendingSyncMember(group, request.memberId()); + +// If this is the leader, then we can attempt to persist state and transition to stable +if (group.isLeader(memberId)) { +log.info("Assignment received from leader {} for group {} for generation {}. " + +"The group has {} members, {} of which are static.", +memberId, groupId, group.generationId(), +group.size(), group.allStaticMemberIds().size()); + +// Fill all members with corresponding assignment. Reset members not specified in +// the assignment to empty assignments. +Map assignments = new HashMap<>(); +request.assignments() +.forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + +Set membersWithMissingAssignment = new HashSet<>(); +group.allMembers().forEach(member -> { +byte[] assignment = assignments.get(member.memberId()); +if (assignment != null) { +member.setAssignment(assignment); +} else { +membersWithMissingAssignment.add(member.memberId()); +member.setAssignment(new byte[0]); +} +}); Review Comment: I thought it would be safe because `resetAndPropagateAssignmentWithError(group, error);` resets all members to empty assignment state if the commit fails. Let's say that another request comes in before the commit fails. The only place we respond the member assignment to the client is when we receive a sync group request in Stable state. So, we never expose the updated assignment until the write succeeds and transitions the state to Stable. Do you think this is too brittle? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2
gharris1727 commented on PR #14029: URL: https://github.com/apache/kafka/pull/14029#issuecomment-1641250195 > Would it be too much to ask to run system tests? I ran the connect system tests on a spare machine and got 181/181 passing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1267427788 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2643,9 +2652,175 @@ private CoordinatorResult updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } +return maybeCompleteJoinPhase(group); +} + +public CoordinatorResult genericGroupSync( +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); +Optional errorOpt = validateSyncGroup(group, request); +if (errorOpt.isPresent()) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(errorOpt.get().code())); Review Comment: As the genericGroupJoin catches the exception, i will repeat the behavior in genericGroupSync to keep it consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15194) Rename local tiered storage segment with offset as prefix for easy navigation
[ https://issues.apache.org/jira/browse/KAFKA-15194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744384#comment-17744384 ] Lan Ding commented on KAFKA-15194: -- [~divijvaidya] Sorry, just saw the message, [~owen-leung] has already picked up this issue. > Rename local tiered storage segment with offset as prefix for easy navigation > - > > Key: KAFKA-15194 > URL: https://issues.apache.org/jira/browse/KAFKA-15194 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Owen C.H. Leung >Priority: Minor > Labels: newbie > > In LocalTieredStorage which is an implementation of RemoteStorageManager, > segments are saved with random UUID. This makes navigating to a particular > segment harder. To navigate a given segment by offset, prepend the offset > information to the segment filename. > https://github.com/apache/kafka/pull/13837#discussion_r1258896009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
ijuma commented on PR #13676: URL: https://github.com/apache/kafka/pull/13676#issuecomment-1641123937 If I'm reading this right, it seems very restrictive. It's very common for other companies to build AK for their own purposes. If they're not allowed to use gradle enterprise, then it should be disabled by default and enabled via a parameter in the Apache CI build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744377#comment-17744377 ] Ismael Juma commented on KAFKA-15205: - Have you seen this behavior in practice? We should definitely fix our error handling if it's not compliant, but it's not clear to me if this is more of a theoretical issue. > Race condition in ShutdownableThread causes InterruptedException > > > Key: KAFKA-15205 > URL: https://issues.apache.org/jira/browse/KAFKA-15205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.6.0 > > > In Shutdownable thread, during close, we call: > initiateShutdown() -> which may interrupt the thread if > isInterruptible is set to true during construction. > After that, we wait for proper shutdown using > awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, > which could be caused by initiateShutdown() earlier, await() throws an > InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an > interrupted exception. > The sequence to reproduce this will be as follows: > App-thread: Name of application thread which spawns and closes Shutdownable > thread > Shut-thread: Name of the shutdownable thread. > 1. App-thread calls ShutThread.initiateShutdown() > 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the > actual interrupt will be async. initiateShutdown() from step 1 returns. > 3. App-thread calls ShutThread.awaitShutdown() > 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await > 5. VM decides to interrupt App-thread and there is a race condition now. > Race condition: > Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements > the CountdownLatch > Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() > throws an interruptedException as per the contract > [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--] > *Solution* > > In ShutDownableThread#awaitShutdown(), when calling await() we should catch > InterruptedException and eat it up (do nothing), if the thread has > isInterruptable set to true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-10775) DOAP has incorrect category
[ https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebb closed KAFKA-10775. > DOAP has incorrect category > --- > > Key: KAFKA-10775 > URL: https://issues.apache.org/jira/browse/KAFKA-10775 > Project: Kafka > Issue Type: Bug >Reporter: Sebb >Assignee: Sebb >Priority: Major > Fix For: 3.1.0 > > > https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36 > reads: > rdf:resource="https://projects.apache.org/projects.html?category#big-data; /> > This should be > http://projects.apache.org/category/big-data; /> > c.f. > http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lixin Yao updated KAFKA-15214: -- Parent: KAFKA-7739 Issue Type: Sub-task (was: Improvement) > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Sub-task > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.6.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744366#comment-17744366 ] Said BOUDJELDA commented on KAFKA-15000: I can take this Jira > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2, 3.5.1 >Reporter: Arushi Rai >Assignee: Said BOUDJELDA >Priority: Critical > Fix For: 3.6.0 > > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to > the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Said BOUDJELDA reassigned KAFKA-15000: -- Assignee: Said BOUDJELDA > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2, 3.5.1 >Reporter: Arushi Rai >Assignee: Said BOUDJELDA >Priority: Critical > Fix For: 3.6.0 > > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to > the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module
muralibasani commented on PR #13417: URL: https://github.com/apache/kafka/pull/13417#issuecomment-1641043256 @fvaleri Managed to remove the core dependency from tools. If am in the right direction, will add tests, fix conflicts, and several other minor things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14031: Docs: Include ZK Deprecation notice and information
mjsax commented on PR #14031: URL: https://github.com/apache/kafka/pull/14031#issuecomment-1641032657 Merged to `trunk` and cherry-picked to `3.5` branch. Will take care to get the changes into `kafka-site`, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #14031: Docs: Include ZK Deprecation notice and information
mjsax merged PR #14031: URL: https://github.com/apache/kafka/pull/14031 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289475 ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -176,17 +210,53 @@ public long dualWriteOffset() { } public void incrementTimedOutHeartbeats() { -timedOutHeartbeats.addAndGet(1); +timedOutHeartbeats.incrementAndGet(); } -public void setTimedOutHeartbeats(long heartbeats) { -timedOutHeartbeats.set(heartbeats); +public void setTimedOutHeartbeats(long value) { +timedOutHeartbeats.set(value); } Review Comment: Yeah, I guess we can remove these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15091: -- Fix Version/s: 3.3.3 > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289020 ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -113,7 +124,30 @@ public Long value() { return time.milliseconds() - lastAppliedRecordTimestamp(); } })); - +registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); Review Comment: added a test that should catch this ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -113,7 +124,30 @@ public Long value() { return time.milliseconds() - lastAppliedRecordTimestamp(); } })); - +registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); Review Comment: added a test that should catch this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15091: -- Fix Version/s: 3.4.2 > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267287803 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -418,24 +415,30 @@ LogDeltaManifest loadLogDelta( public void handleLoadSnapshot(SnapshotReader reader) { eventQueue.append(() -> { try { +long numLoaded = metrics.incrementHandleLoadSnapshotCount(); +String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId()); +log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.", +snapshotName, numLoaded); MetadataDelta delta = new MetadataDelta.Builder(). setImage(image). build(); SnapshotManifest manifest = loadSnapshot(delta, reader); -log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " + -"in {} us.", manifest.provenance().lastContainedOffset(), +log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " + +"and this snapshot in {} us.", snapshotName, +image.provenance().lastContainedOffset(), NANOSECONDS.toMicros(manifest.elapsedNs())); try { image = delta.apply(manifest.provenance()); } catch (Throwable e) { faultHandler.handleFault("Error generating new metadata image from " + -"snapshot at offset " + reader.lastContainedLogOffset(), e); +"snapshot " + snapshotName, e); Review Comment: the offset is included in the snapshot name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15091: -- Fix Version/s: 3.5.2 > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0, 3.5.2 > > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit
C0urante merged PR #13948: URL: https://github.com/apache/kafka/pull/13948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit
C0urante commented on code in PR #13948: URL: https://github.com/apache/kafka/pull/13948#discussion_r1267283474 ## connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java: ## @@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) { public abstract List poll() throws InterruptedException; /** - * - * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This - * method should block until the commit is complete. + * This method is invoked periodically when offsets are committed for this source task. Note that the offsets + * being committed won't necessarily correspond to the latest offsets returned by this source task via + * {@link #poll()}. When exactly-once support is disabled, offsets are committed periodically and asynchronously + * (i.e. on a separate thread from the one which calls {@link #poll()}). When exactly-once support is enabled, + * offsets are committed on transaction commits (also see {@link TransactionBoundary}). * * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets * automatically. This hook is provided for systems that also need to store offsets internally Review Comment: The "store offsets internally" language isn't great, but I'd rather leave it for now and explore it further if/when we start discussing deprecating this method. Connector developers might theoretically use this for acknowledging JMS records, for example, which in a very loose sense is storing offsets (or at least, some JMS-specific equivalent of them) in that system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit
C0urante commented on code in PR #13948: URL: https://github.com/apache/kafka/pull/13948#discussion_r1267278752 ## connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java: ## @@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) { public abstract List poll() throws InterruptedException; /** - * - * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This - * method should block until the commit is complete. + * This method is invoked periodically when offsets are committed for this source task. Note that the offsets + * being committed won't necessarily correspond to the latest offsets returned by this source task via + * {@link #poll()}. When exactly-once support is disabled, offsets are committed periodically and asynchronously + * (i.e. on a separate thread from the one which calls {@link #poll()}). When exactly-once support is enabled, + * offsets are committed on transaction commits (also see {@link TransactionBoundary}). Review Comment: Looks great, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
[ https://issues.apache.org/jira/browse/KAFKA-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-12842: --- Assignee: Greg Harris > Failing test: > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic > -- > > Key: KAFKA-12842 > URL: https://issues.apache.org/jira/browse/KAFKA-12842 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: John Roesler >Assignee: Greg Harris >Priority: Major > > This test failed during a PR build, which means that it failed twice in a > row, due to the test-retry logic in PR builds. > > [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209] > > {noformat} > java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141) > at > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at >
[jira] [Assigned] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke
[ https://issues.apache.org/jira/browse/KAFKA-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-8690: -- Assignee: Greg Harris > Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke > --- > > Key: KAFKA-8690 > URL: https://issues.apache.org/jira/browse/KAFKA-8690 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Greg Harris >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull] > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > > testAddAndRemoveWorker STARTED*02:56:46* > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46* > *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > > testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: > Condition not met within timeout 15000. Connector tasks did not start in > time.*02:56:46* at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46* > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46* > at > org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-10579: --- Assignee: Greg Harris > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: A. Sophie Blee-Goldman >Assignee: Greg Harris >Priority: Major > Labels: flaky-test > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] forlack commented on pull request #14031: Docs: Include ZK Deprecation notice and information
forlack commented on PR #14031: URL: https://github.com/apache/kafka/pull/14031#issuecomment-1640948633 I updated the sentence suggested by @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744347#comment-17744347 ] Lixin Yao commented on KAFKA-15214: --- Here is one of the example scenarios motivates me to create this request. When I am testing the tiered storage feature, I noticed unbalanced byte out traffic rate across brokers. From the available metrics, it's very confusing because all the topic partitions should be balanced across cluster without any skew. I then check the existing error metrics RemoteReadErrorsPerSec, there is only 0 value reported. This confuse me with the impression that there is no problem on remote downloading. so I have to deep dive into logs for more information. At the end, what I find is one broker is not able to fetch remote segments because of this OffsetOutOfRangeException exception consistently happening repeatedly. Example error looks like this: {code:java} 2023-07-15 00:12:42,471 kafkaLogLevel="INFO" [RemoteLogReader-2]: OffsetOutOfRangeException occurred while reading the remote data for mytopic-247: org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request for offset 0 for leader epoch 0 and partition mytopic-247 which does not exist in remote tier. Try again later. kafkaLoggerClass="kafka.log.remote.RemoteLogReader" kafkaLoggerThread="RemoteLogReader-2" {code} Like I said why this partition is requesting for offset 0 repeatedly could be due to other reason, e.g. corrupted metadata or other issues, but if this error is included as part of RemoteReadErrorsPerSec metrics, it could help me a lot on identifying the root cause and setup alerting. Hope this makes sense to you. I am ok to include this as a tag on existing metrics. As long as I have a way to quickly identify and alert on the abnormal behavior, I am ok with it. > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.6.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267254133 ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -113,7 +124,30 @@ public Long value() { return time.milliseconds() - lastAppliedRecordTimestamp(); } })); - +registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); Review Comment: good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267253682 ## metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java: ## @@ -113,7 +124,30 @@ public Long value() { return time.milliseconds() - lastAppliedRecordTimestamp(); } })); - +registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); +} +})); +registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() { +@Override +public Long value() { +return timedOutHeartbeats(); Review Comment: good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267252934 ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -282,15 +294,15 @@ class SharedServer( setDisabledReason(snapshotsDisabledReason). setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). build() -_raftManager.register(loader) Review Comment: Not really fixing a bug per se but it's probably slightly better for performance this way. Mainly, I did not want the `get()` call to be blocked behind another event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251882 ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -282,15 +294,15 @@ class SharedServer( setDisabledReason(snapshotsDisabledReason). setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). build() -_raftManager.register(loader) try { - loader.installPublishers(Collections.singletonList(snapshotGenerator)) + loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get() Review Comment: yes, although I think it's a bug that's very unlikely to cause harm in practice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251209 ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -259,15 +262,24 @@ class SharedServer( raftManager = _raftManager _raftManager.startup() +metadataLoaderMetrics = if (brokerMetrics != null) { + new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()), +elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs), +batchSize => brokerMetrics.updateBatchSize(batchSize), +brokerMetrics.lastAppliedImageProvenance) +} else { + new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()), +_ => {}, +_ => {}, +new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY)) Review Comment: There are a few broker-specific metrics hanging out in `BrokerServerMetrics.scala` and this is a way to connect `MetadataLoaderMetrics` to that class. So that the metadata loader only needs to interact with `MetadataLoaderMetrics` and not the broker-specific code. Long-term, we probably want to move all the loader metrics into `MetadataLoaderMetrics`, and make them all accessible on the controller as well as broker. But that's out of scope for this change (and would need a KIP anyway) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267248743 ## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ## @@ -782,7 +735,7 @@ public void testTimeouts() throws Throwable { build() ) { QuorumController controller = controlEnv.activeController(); -CountDownLatch countDownLatch = controller.pause(); +CountDownLatch countDownLatch = pause(controller); Review Comment: yeah, we can remove this and just use the test function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267249264 ## metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java: ## @@ -140,39 +142,6 @@ public class QuorumControllerTest { static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata. fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap"); -static class MockControllerMetrics extends QuorumControllerMetrics { -final AtomicBoolean closed = new AtomicBoolean(false); - -MockControllerMetrics() { -super(Optional.empty(), Time.SYSTEM, false); -} - -@Override -public void close() { -super.close(); -closed.set(true); -} -} - -/** - * Test creating a new QuorumController and closing it. - */ -@Test -public void testCreateAndClose() throws Throwable { -MockControllerMetrics metrics = new MockControllerMetrics(); -try ( -LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). -build(); -QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). -setControllerBuilderInitializer(controllerBuilder -> { -controllerBuilder.setMetrics(metrics); -}). -build() -) { -} -assertTrue(metrics.closed.get(), "metrics were not closed"); -} Review Comment: This was moved to `QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267248169 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); Review Comment: I've removed these usages, so the error propagates normally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics
cmccabe commented on code in PR #14010: URL: https://github.com/apache/kafka/pull/14010#discussion_r1267246622 ## metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils; +import org.apache.kafka.image.MetadataProvenance; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class MetadataLoaderMetricsTest { +private static class FakeMetadataLoaderMetrics implements AutoCloseable { +final AtomicLong batchProcessingTimeNs = new AtomicLong(0L); +final AtomicInteger batchSize = new AtomicInteger(0); +final AtomicReference provenance = +new AtomicReference<>(MetadataProvenance.EMPTY); +final MetadataLoaderMetrics metrics; + +FakeMetadataLoaderMetrics() { +this(Optional.empty()); +} + +FakeMetadataLoaderMetrics(MetricsRegistry registry) { +this(Optional.of(registry)); +} + +FakeMetadataLoaderMetrics(Optional registry) { +metrics = new MetadataLoaderMetrics( +registry, +n -> batchProcessingTimeNs.set(n), +n -> batchSize.set(n), +provenance); +} + +@Override +public void close() { +metrics.close(); +} +} + +@Test +public void testMetricNames() { +MetricsRegistry registry = new MetricsRegistry(); +try { +try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { +ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", +new HashSet<>(Arrays.asList( + "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion", + "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount" +))); +} +ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", +Collections.emptySet()); +} finally { +registry.shutdown(); +} +} + +@Test +public void testUpdateBatchProcessingTimeNs() { +MetricsRegistry registry = new MetricsRegistry(); +try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { +fakeMetrics.metrics.updateBatchProcessingTimeNs(123L); +assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get()); +} +} + +@Test +public void testUpdateBatchSize() { +MetricsRegistry registry = new MetricsRegistry(); +try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { +fakeMetrics.metrics.updateBatchSize(50); +assertEquals(50, fakeMetrics.batchSize.get()); +} +} + +@Test +public void testUpdateLastAppliedImageProvenance() { +MetricsRegistry registry = new MetricsRegistry(); +try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { +fakeMetrics.metrics.updateLastAppliedImageProvenance(new MetadataProvenance(1L, 2, 3L)); +assertEquals(new MetadataProvenance(1L, 2, 3L), fakeMetrics.provenance.get()); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
C0urante commented on code in PR #14024: URL: https://github.com/apache/kafka/pull/14024#discussion_r1267245540 ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java: ## @@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySche @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable headers) { -return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); +return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers, +originalTopic(), originalKafkaPartition(), originalKafkaOffset()); Review Comment: A separate ticket/PR sounds great, thanks! Feel free to ping me on the PR if you implement the fx. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
C0urante commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267237169 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); Review Comment: Gotcha! The latest patch works, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
C0urante commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267238592 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); -if (pluginKlass.getClassLoader() != loader) { +if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", -pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); +pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } -result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); +result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); } } return result; } +/** + * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. + * + * @param klass The plugin superclass which is being loaded + * @param function A function on a {@link ServiceLoader} which may throw {@link LinkageError} + * @return the return value of function + * @throws Error errors thrown by the passed-in function + * @param Type being iterated over by the ServiceLoader + * @param Return value of the passed-in function + */ +private U handleLinkageError(Class klass, PluginSource source, Supplier function) { +// It's difficult to know for sure if the iterator was able to advance past the first broken +// plugin class, or if it will continue to fail on that broken class for any subsequent calls +// to Iterator::hasNext or Iterator::next +// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes +// the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix +// in the JDK standard library ServiceLoader implementation is unlikely to land +LinkageError lastError = null; +// Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin, +// but the LinkageError varies between calls. This limit is chosen to be higher than the typical number +// of plugins in a single plugin location, and to limit the amount of log-spam on startup. +for (int i = 0; i < 100; i++) { +try { +return function.get(); +} catch (LinkageError t) { +// As an optimization, hide subsequent error logs if two consecutive errors look similar. +// This reduces log-spam for iterators which cannot advance and rethrow the same exception. +if (lastError == null +|| !Objects.equals(lastError.getClass(), t.getClass()) +||
[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
C0urante commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267236920 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); Review Comment: I think that given the low likelihood of ever hitting this error when invoking `ServiceLoader::load` or `ServiceLoader::iterator` (neither is annotated with any kind of `throws` declaration, the iterator is specifically called out as being lazy-loading, and I'm not aware of any JDK bug reports that would lead to this kind of behavior), it's better to throw errors to the caller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14669) Include MirrorMaker connector configurations in docs
[ https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14669: -- Priority: Blocker (was: Major) > Include MirrorMaker connector configurations in docs > > > Key: KAFKA-14669 > URL: https://issues.apache.org/jira/browse/KAFKA-14669 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Blocker > Fix For: 3.6.0 > > > In the https://kafka.apache.org/documentation/#georeplication-flow-configure > section we list some of the MirrorMaker connectors configurations. These are > hardcoded in the docs: > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788 > Instead we should used the generated docs (added as part of > https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78) > like we do for the file connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14669) Include MirrorMaker connector configurations in docs
[ https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744337#comment-17744337 ] Chris Egerton commented on KAFKA-14669: --- Reopening and marking as a blocker with fix version 3.6.0 so that we can merge [https://github.com/apache/kafka/pull/14041] (or another fix for the same issue) before this makes it into a release. > Include MirrorMaker connector configurations in docs > > > Key: KAFKA-14669 > URL: https://issues.apache.org/jira/browse/KAFKA-14669 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Blocker > Fix For: 3.6.0 > > > In the https://kafka.apache.org/documentation/#georeplication-flow-configure > section we list some of the MirrorMaker connectors configurations. These are > hardcoded in the docs: > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788 > Instead we should used the generated docs (added as part of > https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78) > like we do for the file connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-14669) Include MirrorMaker connector configurations in docs
[ https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reopened KAFKA-14669: --- > Include MirrorMaker connector configurations in docs > > > Key: KAFKA-14669 > URL: https://issues.apache.org/jira/browse/KAFKA-14669 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.6.0 > > > In the https://kafka.apache.org/documentation/#georeplication-flow-configure > section we list some of the MirrorMaker connectors configurations. These are > hardcoded in the docs: > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788 > Instead we should used the generated docs (added as part of > https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78) > like we do for the file connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page
C0urante commented on PR #14041: URL: https://github.com/apache/kafka/pull/14041#issuecomment-1640847533 CC @tinaselenge; sorry for missing this during review! Just wanted to give you a heads-up in case you make or review docs changes in the future. @gharris1727 @mimaison would appreciate it if either of you could take a look at this quick fix. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page
C0urante opened a new pull request, #14041: URL: https://github.com/apache/kafka/pull/14041 Follow-up for https://github.com/apache/kafka/pull/13658, where we forgot to update the table of contents to include the new 3.8 section for MirrorMaker 2. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744334#comment-17744334 ] Divij Vaidya commented on KAFKA-15214: -- Hi Lixin Can you help us understand more about the motivation (perhaps by explaining an example scenario where this metric would be useful)? Also, may want to consider adding specific exception type as a "tag" to the existing error metric. > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.6.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bmscomp commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
bmscomp commented on PR #14032: URL: https://github.com/apache/kafka/pull/14032#issuecomment-1640794180 @divijvaidya It's a rebase issue, now that I am using the last version of `.asf.yam` as an issue is fixed before the execution is going on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
divijvaidya commented on PR #14032: URL: https://github.com/apache/kafka/pull/14032#issuecomment-1640773987 Tests are failing with a related error. ``` * What went wrong: Gradle could not start your build. > Cannot create service of type BuildSessionActionExecutor using method LauncherServices$ToolingBuildSessionScopeServices.createActionExecutor() as there is a problem with parameter #21 of type FileSystemWatchingInformation. > Cannot create service of type BuildLifecycleAwareVirtualFileSystem using method VirtualFileSystemServices$GradleUserHomeServices.createVirtualFileSystem() as there is a problem with parameter #7 of type GlobalCacheLocations. > Cannot create service of type GlobalCacheLocations using method GradleUserHomeScopeServices.createGlobalCacheLocations() as there is a problem with parameter #1 of type List. > Could not create service of type FileAccessTimeJournal using GradleUserHomeScopeServices.createFileAccessTimeJournal(). > Timeout waiting to lock journal cache (/home/jenkins/.gradle/caches/journal-1). It is currently in use by another Gradle instance. Owner PID: 4026719 Our PID: 163366 Owner Operation: Our operation: Lock file: /home/jenkins/.gradle/caches/journal-1/journal-1.lock ``` @bmscomp did you try running `./gradlew build` locally? Please ensure that build is successful for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stevenbooke commented on pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh
stevenbooke commented on PR #13842: URL: https://github.com/apache/kafka/pull/13842#issuecomment-1640762194 As you have recommended, the script will create a new branch for the changes, commit the changes to the new branch, and open a pull request with the updated `.asf.yaml` file. The updated `.asf.yaml` file will be the same format as it was previously (it will retain the comments and only updated the `github_whitelist` list and the `collaborators` list). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Staniel commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics
Staniel commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1267173038 ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -54,10 +60,14 @@ public Void call() { logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); FetchDataInfo fetchDataInfo = rlm.read(fetchInfo); + brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); + brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); } catch (OffsetOutOfRangeException e) { result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); Review Comment: https://issues.apache.org/jira/browse/KAFKA-15214 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh
stevenbooke commented on code in PR #13842: URL: https://github.com/apache/kafka/pull/13842#discussion_r1267168635 ## refresh-collaborators.py: ## @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from bs4 import BeautifulSoup +from github import Github +import yaml +from datetime import datetime, timedelta + +### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ### +github_token = os.environ.get('GITHUB_TOKEN') +g = Github(github_token) +repo = g.get_repo("apache/kafka-site") +contents = repo.get_contents("committers.html") +content = contents.decoded_content +soup = BeautifulSoup(content, "html.parser") +committer_logins = [login.text for login in soup.find_all('div', class_='github_login')] + +### GET THE CONTRIBUTORS AND THEIR COMMIT VOLUME OVER THE LAST YEAR TO THE apache/kafka REPO ### +n = 10 +contributors_login_to_commit_volume = {} +end_date = datetime.now() +start_date = end_date - timedelta(days=365) +for commit in repo.get_commits(since=start_date, until=end_date): +login = commit.author.login Review Comment: Thank you for pointing that out. I will make the appropriate changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lixin Yao updated KAFKA-15214: -- Description: In the current metrics RemoteReadErrorsPerSec, the exception type OffsetOutOfRangeException is not included. In our testing with tiered storage feature (at Apple), we noticed several cases where remote download is affected and stuck due to repeatedly OffsetOutOfRangeException in some particular broker or topic partitions. The root cause could be various but currently without a metrics it's very hard to catch this issue and debug in a timely fashion. It's understandable that the exception itself could not be the root cause but this exception metric could be a good metrics for us to alert and investigate. Related discussion [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] I am happy to contribute to this if the request is agreed. was: In the current metrics RemoteReadErrorsPerSec, the exception type OffsetOutOfRangeException is not included. In our testing with tiered storage feature, we noticed several cases where remote download is affected and stuck due to repeatedly OffsetOutOfRangeException in some particular broker or topic partitions. The root cause could be various but currently without a metrics it's very hard to catch this issue and debug in a timely fashion. It's understandable that the exception itself could not be the root cause but this exception metric could be a good metrics for us to alert and investigate. Related discussion [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] I am happy to contribute to this if the request is agreed. > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.6.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tanay27 opened a new pull request, #14040: KAFKA-15212: Delete Classgraph-MIT license
tanay27 opened a new pull request, #14040: URL: https://github.com/apache/kafka/pull/14040 Deleted classgraph MIT result file. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
Lixin Yao created KAFKA-15214: - Summary: Add metrics for OffsetOutOfRangeException when tiered storage is enabled Key: KAFKA-15214 URL: https://issues.apache.org/jira/browse/KAFKA-15214 Project: Kafka Issue Type: Improvement Components: metrics Affects Versions: 3.6.0 Reporter: Lixin Yao Fix For: 3.6.0 In the current metrics RemoteReadErrorsPerSec, the exception type OffsetOutOfRangeException is not included. In our testing with tiered storage feature, we noticed several cases where remote download is affected and stuck due to repeatedly OffsetOutOfRangeException in some particular broker or topic partitions. The root cause could be various but currently without a metrics it's very hard to catch this issue and debug in a timely fashion. It's understandable that the exception itself could not be the root cause but this exception metric could be a good metrics for us to alert and investigate. Related discussion [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15212) Remove unneeded classgraph license file
[ https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanay Karmarkar reassigned KAFKA-15212: --- Assignee: Tanay Karmarkar > Remove unneeded classgraph license file > --- > > Key: KAFKA-15212 > URL: https://issues.apache.org/jira/browse/KAFKA-15212 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Major > Labels: newbie > Fix For: 3.6.0 > > > The license file for classgraph can be completely removed from here: > [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it > is not a dependency of Kafka any more. > The associated package was removed from license at > [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bmscomp commented on a diff in pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
bmscomp commented on code in PR #14032: URL: https://github.com/apache/kafka/pull/14032#discussion_r1267159224 ## gradle/wrapper/gradle-wrapper.properties: ## @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=5625a0ae20fe000d9225d000b36909c7a0e0e8dda61c19b12da769add847c975 -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-all.zip +distributionSha256Sum=03ec176d388f2aa99defcadc3ac6adf8dd2bce5145a129659537c0874dea5ad1 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip Review Comment: @divijvaidya Updated the pull request to use the all distribution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288 ] Matthias J. Sax edited comment on KAFKA-15190 at 7/18/23 6:22 PM: -- One more thing: the `process.id` is actually only used as part of the `client.id` iff not `client.id` config is set. – Hence, setting the `client.id` should avoid the issue of task shuffling (and the rebalance in itself should not be an issue, as it's cheap)? was (Author: mjsax): One more thing: the `process.id` is actually only used as part of the `client.id` iff not `client.id` config is set. – Hence, setting the `client.id` should avoid the issue of rebalancing (and task shuffling)? > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest
gharris1727 opened a new pull request, #14039: URL: https://github.com/apache/kafka/pull/14039 This exception doesn't get thrown by the BouncyCastleProvider's implementation of HmacSHA256, so simulate the error just in case BouncyCastle is loaded before the test is run. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Staniel commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics
Staniel commented on code in PR #13944: URL: https://github.com/apache/kafka/pull/13944#discussion_r1267143752 ## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ## @@ -54,10 +60,14 @@ public Void call() { logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); FetchDataInfo fetchDataInfo = rlm.read(fetchInfo); + brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); + brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); } catch (OffsetOutOfRangeException e) { result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); Review Comment: Thank you. I will create a JIRA to track that discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1267142346 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -89,52 +91,12 @@ public BatchAccumulator( this.appendLock = new ReentrantLock(); } -/** - * Append a list of records into as many batches as necessary. - * - * The order of the elements in the records argument will match the order in the batches. - * This method will use as many batches as necessary to serialize all of the records. Since - * this method can split the records into multiple batches it is possible that some of the - * records will get committed while other will not when the leader fails. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in the batches - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum - * batch size; if this exception is throw some of the elements in records may have - * been committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ -public long append(int epoch, List records) { -return append(epoch, records, false); -} - -/** - * Append a list of records into an atomic batch. We guarantee all records are included in the - * same underlying record batch so that either all of the records become committed or none of - * them do. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in a batch - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum - * batch size; if this exception is throw none of the elements in records were - * committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ -public long appendAtomic(int epoch, List records) { -return append(epoch, records, true); -} - -private long append(int epoch, List records, boolean isAtomic) { +public long append( +int epoch, +List records, +OptionalLong requiredEndOffset, +boolean isAtomic +) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1267137597 ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -172,15 +176,17 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch + * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive). Review Comment: ok. we can use `requiredBaseOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1267130598 ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -80,8 +81,11 @@ interface Listener { * epoch. * * @param leader the current leader and epoch + * @param endOffset the current log end offset (exclusive). This is useful for nodes that + * are claiming leadership, because it lets them know what log offset they + * should attempt to write to next. Review Comment: > What problem are you trying so solve? We have to know the offset of records that we apply. But we apply records before we submit them to Raft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)
[ https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-8466: Labels: newbie (was: ) > Remove 'jackson-module-scala' dependency (and replace it with some code) > > > Key: KAFKA-8466 > URL: https://issues.apache.org/jira/browse/KAFKA-8466 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Dejan Stojadinović >Priority: Minor > Labels: newbie > > *Prologue:* > * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889] > * [https://github.com/apache/kafka/pull/5726/files#r289078080] > *Rationale:* one dependency less is always a good thing. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15213) Provide the exact offset to QuorumController.replay
Colin McCabe created KAFKA-15213: Summary: Provide the exact offset to QuorumController.replay Key: KAFKA-15213 URL: https://issues.apache.org/jira/browse/KAFKA-15213 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Provide the exact offset to QuorumController.replay so that we can implement metadata transactions. We need this so that we can know the offset where the records will be applied before we apply them in QuorumControllers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)
[ https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744315#comment-17744315 ] Divij Vaidya commented on KAFKA-8466: - Based on the comment above marking as Unassigned so that someone from community can pick this up. > Remove 'jackson-module-scala' dependency (and replace it with some code) > > > Key: KAFKA-8466 > URL: https://issues.apache.org/jira/browse/KAFKA-8466 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Dejan Stojadinović >Priority: Minor > > *Prologue:* > * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889] > * [https://github.com/apache/kafka/pull/5726/files#r289078080] > *Rationale:* one dependency less is always a good thing. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)
[ https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-8466: --- Assignee: (was: Dejan Stojadinović) > Remove 'jackson-module-scala' dependency (and replace it with some code) > > > Key: KAFKA-8466 > URL: https://issues.apache.org/jira/browse/KAFKA-8466 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Dejan Stojadinović >Priority: Minor > > *Prologue:* > * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889] > * [https://github.com/apache/kafka/pull/5726/files#r289078080] > *Rationale:* one dependency less is always a good thing. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15000: - Affects Version/s: 3.5.1 > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2, 3.5.1 >Reporter: Arushi Rai >Priority: Critical > Fix For: 3.6.0 > > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to > the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core
[ https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15000: - Fix Version/s: 3.6.0 > High vulnerability PRISMA-2023-0067 reported in jackson-core > > > Key: KAFKA-15000 > URL: https://issues.apache.org/jira/browse/KAFKA-15000 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.0, 3.3.2 >Reporter: Arushi Rai >Priority: Critical > Fix For: 3.6.0 > > > Kafka is using jackson-core version 2.13.4 which has high vulnerability > reported [PRISMA-2023-0067. > |https://github.com/FasterXML/jackson-core/pull/827] > This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to > the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15212) Remove unneeded classgraph license file
[ https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15212: - Summary: Remove unneeded classgraph license file (was: Remove classgraph license) > Remove unneeded classgraph license file > --- > > Key: KAFKA-15212 > URL: https://issues.apache.org/jira/browse/KAFKA-15212 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Major > Labels: newbie > Fix For: 3.6.0 > > > The license file for classgraph can be completely removed from here: > [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it > is not a dependency of Kafka any more. > The associated package was removed from license at > [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15212) Remove classgraph license
Divij Vaidya created KAFKA-15212: Summary: Remove classgraph license Key: KAFKA-15212 URL: https://issues.apache.org/jira/browse/KAFKA-15212 Project: Kafka Issue Type: Bug Reporter: Divij Vaidya Fix For: 3.6.0 The license file for classgraph can be completely removed from here: [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it is not a dependency of Kafka any more. The associated package was removed from license at [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15211) DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after TestSslUtils#generate
Greg Harris created KAFKA-15211: --- Summary: DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after TestSslUtils#generate Key: KAFKA-15211 URL: https://issues.apache.org/jira/browse/KAFKA-15211 Project: Kafka Issue Type: Test Components: clients, KafkaConnect Reporter: Greg Harris Assignee: Greg Harris The DistributedConfigTest#shouldFailWithInvalidKeySize attempts to configure a hashing algorithm with a key size of 0. When run alone, this test passes, as the default Java hashing algorithm used rejects the key size. However, when TestSslUtils#generate runs first, such as via the RestForwardingIntegrationTest, the BouncyCastleProvider is loaded, which provides an alternative hashing algorithm. This implementation does _not_ reject the key size, causing the test to fail. We should ether prevent TestSslUtils#generate from leaving the BouncyCastleProvider loaded after use, or adjust the test to pass when the BouncyCastleProvider is loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.2
[ https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-15208: Assignee: Said BOUDJELDA > Upgrade Jackson dependencies to version 2.15.2 > -- > > Key: KAFKA-15208 > URL: https://issues.apache.org/jira/browse/KAFKA-15208 > Project: Kafka > Issue Type: Improvement > Components: connect, kraft >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > Labels: dependencies > Fix For: 3.6.0 > > > Upgrading the version of Jackson dependencies to the latest stable version > 2.15.2 can bring much bug fixing security issues solving and performance > improvement > > Check release notes back to the current version > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.2
[ https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15208: - Fix Version/s: 3.6.0 > Upgrade Jackson dependencies to version 2.15.2 > -- > > Key: KAFKA-15208 > URL: https://issues.apache.org/jira/browse/KAFKA-15208 > Project: Kafka > Issue Type: Improvement > Components: connect, kraft >Reporter: Said BOUDJELDA >Priority: Major > Labels: dependencies > Fix For: 3.6.0 > > > Upgrading the version of Jackson dependencies to the latest stable version > 2.15.2 can bring much bug fixing security issues solving and performance > improvement > > Check release notes back to the current version > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10775) DOAP has incorrect category
[ https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10775. Fix Version/s: 3.1.0 Assignee: Sebb Resolution: Fixed > DOAP has incorrect category > --- > > Key: KAFKA-10775 > URL: https://issues.apache.org/jira/browse/KAFKA-10775 > Project: Kafka > Issue Type: Bug >Reporter: Sebb >Assignee: Sebb >Priority: Major > Fix For: 3.1.0 > > > https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36 > reads: > rdf:resource="https://projects.apache.org/projects.html?category#big-data; /> > This should be > http://projects.apache.org/category/big-data; /> > c.f. > http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #11423: Update doap_Kafka.rdf
mimaison commented on PR #11423: URL: https://github.com/apache/kafka/pull/11423#issuecomment-1640685154 Right, I wasn't aware of the associated ticket. I closed it now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
divijvaidya commented on code in PR #14032: URL: https://github.com/apache/kafka/pull/14032#discussion_r1267117269 ## gradle/wrapper/gradle-wrapper.properties: ## @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=5625a0ae20fe000d9225d000b36909c7a0e0e8dda61c19b12da769add847c975 -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-all.zip +distributionSha256Sum=03ec176d388f2aa99defcadc3ac6adf8dd2bce5145a129659537c0874dea5ad1 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip Review Comment: we changed from "all" package to "bin". Is there a specific reason for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanay Karmarkar reassigned KAFKA-15205: --- Assignee: (was: Tanay Karmarkar) > Race condition in ShutdownableThread causes InterruptedException > > > Key: KAFKA-15205 > URL: https://issues.apache.org/jira/browse/KAFKA-15205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.6.0 > > > In Shutdownable thread, during close, we call: > initiateShutdown() -> which may interrupt the thread if > isInterruptible is set to true during construction. > After that, we wait for proper shutdown using > awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, > which could be caused by initiateShutdown() earlier, await() throws an > InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an > interrupted exception. > The sequence to reproduce this will be as follows: > App-thread: Name of application thread which spawns and closes Shutdownable > thread > Shut-thread: Name of the shutdownable thread. > 1. App-thread calls ShutThread.initiateShutdown() > 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the > actual interrupt will be async. initiateShutdown() from step 1 returns. > 3. App-thread calls ShutThread.awaitShutdown() > 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await > 5. VM decides to interrupt App-thread and there is a race condition now. > Race condition: > Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements > the CountdownLatch > Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() > throws an interruptedException as per the contract > [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--] > *Solution* > > In ShutDownableThread#awaitShutdown(), when calling await() we should catch > InterruptedException and eat it up (do nothing), if the thread has > isInterruptable set to true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanay Karmarkar reassigned KAFKA-15205: --- Assignee: Tanay Karmarkar > Race condition in ShutdownableThread causes InterruptedException > > > Key: KAFKA-15205 > URL: https://issues.apache.org/jira/browse/KAFKA-15205 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Major > Fix For: 3.6.0 > > > In Shutdownable thread, during close, we call: > initiateShutdown() -> which may interrupt the thread if > isInterruptible is set to true during construction. > After that, we wait for proper shutdown using > awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, > which could be caused by initiateShutdown() earlier, await() throws an > InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an > interrupted exception. > The sequence to reproduce this will be as follows: > App-thread: Name of application thread which spawns and closes Shutdownable > thread > Shut-thread: Name of the shutdownable thread. > 1. App-thread calls ShutThread.initiateShutdown() > 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the > actual interrupt will be async. initiateShutdown() from step 1 returns. > 3. App-thread calls ShutThread.awaitShutdown() > 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await > 5. VM decides to interrupt App-thread and there is a race condition now. > Race condition: > Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements > the CountdownLatch > Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() > throws an interruptedException as per the contract > [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--] > *Solution* > > In ShutDownableThread#awaitShutdown(), when calling await() we should catch > InterruptedException and eat it up (do nothing), if the thread has > isInterruptable set to true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15199) remove leading and trailing spaces from user input in release.py
[ https://issues.apache.org/jira/browse/KAFKA-15199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tanay Karmarkar reassigned KAFKA-15199: --- Assignee: Tanay Karmarkar > remove leading and trailing spaces from user input in release.py > > > Key: KAFKA-15199 > URL: https://issues.apache.org/jira/browse/KAFKA-15199 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on PR #13278: URL: https://github.com/apache/kafka/pull/13278#issuecomment-164085 @mimaison Thanks for the review. It seems I addressed all your comments. Please, take a look one more time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267103768 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); -if (pluginKlass.getClassLoader() != loader) { +if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", -pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); +pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } -result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); +result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); } } return result; } +/** + * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. + * + * @param klass The plugin superclass which is being loaded + * @param function A function on a {@link ServiceLoader} which may throw {@link LinkageError} + * @return the return value of function + * @throws Error errors thrown by the passed-in function + * @param Type being iterated over by the ServiceLoader + * @param Return value of the passed-in function + */ +private U handleLinkageError(Class klass, PluginSource source, Supplier function) { +// It's difficult to know for sure if the iterator was able to advance past the first broken +// plugin class, or if it will continue to fail on that broken class for any subsequent calls +// to Iterator::hasNext or Iterator::next +// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes +// the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix +// in the JDK standard library ServiceLoader implementation is unlikely to land +LinkageError lastError = null; +// Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin, +// but the LinkageError varies between calls. This limit is chosen to be higher than the typical number +// of plugins in a single plugin location, and to limit the amount of log-spam on startup. +for (int i = 0; i < 100; i++) { +try { +return function.get(); +} catch (LinkageError t) { +// As an optimization, hide subsequent error logs if two consecutive errors look similar. +// This reduces log-spam for iterators which cannot advance and rethrow the same exception. +if (lastError == null +|| !Objects.equals(lastError.getClass(), t.getClass()) +||
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1267096601 ## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { +/** + * Returns a list of duplicated items + */ +public static Iterable duplicates(Iterable s) { +return StreamSupport.stream(s.spliterator(), false) Review Comment: Nice proposal. Thanks. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10775) DOAP has incorrect category
[ https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744305#comment-17744305 ] Piotr Zygielo commented on KAFKA-10775: --- May I ask to have it closed, please? I can only clone it, and even that it's difference of just one letter, I think to close is much more better solution. > DOAP has incorrect category > --- > > Key: KAFKA-10775 > URL: https://issues.apache.org/jira/browse/KAFKA-10775 > Project: Kafka > Issue Type: Bug >Reporter: Sebb >Priority: Major > > https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36 > reads: > rdf:resource="https://projects.apache.org/projects.html?category#big-data; /> > This should be > http://projects.apache.org/category/big-data; /> > c.f. > http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267094589 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); Review Comment: > Since that field (and its accessor method) are currently only used for log messages, what do you think about altering PluginSource::location to return a string, and using "System classpath" in that case? I can't do that, because I need the Path object later in the migration script. I've used a non-null sentinel path instead, so that all of the logging call-sites are improved but the Path object is still available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1267093982 ## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { +/** + * Returns a list of duplicated items + */ +public static Iterable duplicates(Iterable s) { Review Comment: > I wonder if we should return Set instead of Iterable Changes to return Set. > so maybe we can put this into tools? CoreUtils, CoreUtilsTest move to tools. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on code in PR #13278: URL: https://github.com/apache/kafka/pull/13278#discussion_r1267084525 ## tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteRecordsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.CoreUtils; +import org.apache.kafka.server.util.Json; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * A command for delete records of the given partitions down to the specified offset. + */ +public class DeleteRecordsCommand { +private static final int EARLIEST_VERSION = 1; + +public static void main(String[] args) throws Exception { +execute(args, System.out); +} + +static Collection> parseOffsetJsonStringWithoutDedup(String jsonData) { +try { +JsonNode js = Json.tryParseFull(jsonData).node(); + +int version = EARLIEST_VERSION; + +if (js.has("version")) +version = js.get("version").asInt(); + +return parseJsonData(version, js); +} catch (JsonProcessingException e) { +throw new AdminOperationException("The input string is not a valid JSON"); +} +} + +private static Collection> parseJsonData(int version, JsonNode js) throws JsonMappingException { Review Comment: Refactored to use JsonValue when possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267082036 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); Review Comment: In the case where an Implementation of ServiceLoader may eagerly-evaluate a first provider and cause a LinkageError to appear, I wanted to re-use the handleLinkageError logging. I don't think that the retries for these operations would be effective, and that the later retries would probably be wasteful. However, I thought re-using the same function in both places was simpler than customizing the behavior of each call. Since this code is (probably never) going to be exercised, I tried to keep it as simple as possible. Do you think that we should log errors from load and iterator? I can catch the error or just let it propagate to the caller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15207) ProducerIdManager#generateProducerId() should return different error code for newer clients
[ https://issues.apache.org/jira/browse/KAFKA-15207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744300#comment-17744300 ] Justine Olshan commented on KAFKA-15207: It's a bit tricky to know when we are receiving a newer client request here since the api is indirect. I suppose we could add a field to the api that specifies what the client version was, but that would require a bump to the api. Is there something I'm missing that would work better? > ProducerIdManager#generateProducerId() should return different error code for > newer clients > --- > > Key: KAFKA-15207 > URL: https://issues.apache.org/jira/browse/KAFKA-15207 > Project: Kafka > Issue Type: Task >Reporter: Jeff Kim >Priority: Major > > [https://github.com/apache/kafka/pull/13267] made changes to > ProducerIdManager that does not block request handler threads while waiting > for a new producer id block. Instead of blocking, we return > COORDINATOR_LOAD_IN_PROGRESS. > > We return this rather than REQUEST_TIMED_OUT since older clients treat the > error as fatal when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. > > For newer clients, we should return an error that is more aligned with what > the client experiences. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tanay27 commented on a diff in pull request #14035: KAFKA-15199: Remove Leading and Trailing Spaces
tanay27 commented on code in PR #14035: URL: https://github.com/apache/kafka/pull/14035#discussion_r1267071244 ## release.py: ## @@ -692,7 +700,7 @@ def select_gpg_key(): fail("Ok, giving up") if not user_ok("Ok to push RC tag %s (y/n)?: " % rc_tag): fail("Ok, giving up") -cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag)) +cmd("Pushing RC tag", f"git push {PUSH_REMOTE_NAME} {rc_tag}") Review Comment: got it, reverting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1267070768 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1245,4 +1422,1304 @@ public static String consumerGroupSessionTimeoutKey(String groupId, String membe public static String consumerGroupRevocationTimeoutKey(String groupId, String memberId) { return "revocation-timeout-" + groupId + "-" + memberId; } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should be removed. +removeGroup(groupId); +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = member.rebalanceTimeout() == -1 ? +member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> genericGroup.add(member, null)); +groups.put(groupId, genericGroup); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +boolean isNewGroup = !groups.containsKey(groupId); +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +if (!acceptJoiningMember(group, memberId)) { +group.remove(memberId); +responseFuture.complete(new JoinGroupResponseData() +
[jira] [Created] (KAFKA-15210) Mention vote should be open for at atleast 72 hours
Divij Vaidya created KAFKA-15210: Summary: Mention vote should be open for at atleast 72 hours Key: KAFKA-15210 URL: https://issues.apache.org/jira/browse/KAFKA-15210 Project: Kafka Issue Type: Sub-task Reporter: Divij Vaidya The voting deadline should be at least 3 days from the time VOTE email is posted. Hence, the script should mention that the date should be at least 72 hours from now. The change needs to be done at the line below: *** Please download, test and vote by -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288 ] Matthias J. Sax commented on KAFKA-15190: - One more thing: the `process.id` is actually only used as part of the `client.id` iff not `client.id` config is set. – Hence, setting the `client.id` should avoid the issue of rebalancing (and task shuffling)? > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)