[jira] [Assigned] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2023-07-18 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reassigned KAFKA-15160:
--

Assignee: Phuc Hong Tran

> Message bytes duplication in Kafka headers when compression is enabled
> --
>
> Key: KAFKA-15160
> URL: https://issues.apache.org/jira/browse/KAFKA-15160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression, consumer
>Affects Versions: 3.2.3, 3.3.2
>Reporter: Vikash Mishra
>Assignee: Phuc Hong Tran
>Priority: Critical
> Attachments: java heap dump.png, wireshark-min.png
>
>
> I created a spring Kafka consumer using @KafkaListener.
> During this, I encounter a scenario where when data is compressed ( any 
> compression snappy/gzip) and consumed by the consumer then I see that in a 
> heap dump, there is a " byte" occupying the same amount of memory as in 
> Message value.
> This behavior is seen only in cases when compressed data is consumed by 
> consumers not in the case of uncompressed data.
> Tried to capture Kafka's message through Wireshark, there it shows the proper 
> size of data incoming from Kafka server & no extra bytes in headers. So, this 
> is definitely something in Kafka client. Spring doesn't do any actions about 
> compression; the whole functionality is done internally in the Kafka client 
> library.
> Attached is the screenshot of the heap dump and Wireshark.
> This seems like a critical issue as message size in memory almost gets 
> doubles impacting consumer memory and performance. Somewhere it feels like 
> the actual message value is copied to headers?
> *To Reproduce*
>  # Produce compressed data on any topic.
>  # Create a simple consumer consuming from the above-created topic.
>  # Capture heap dump.
> *Expected behavior*
> Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Phuc-Hong-Tran closed pull request #14026: KAFKA-15152: Fix incorrect format specifiers when formatting string

2023-07-18 Thread via GitHub


Phuc-Hong-Tran closed pull request #14026: KAFKA-15152: Fix incorrect format 
specifiers when formatting string
URL: https://github.com/apache/kafka/pull/14026


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-18 Thread via GitHub


nizhikov commented on PR #13278:
URL: https://github.com/apache/kafka/pull/13278#issuecomment-1641386848

   Tests failures unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java

2023-07-18 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1641385324

   Tests failures seems unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-07-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-15215:
--

 Summary: The default.dsl.store config is not compatible with 
custom state stores
 Key: KAFKA-15215
 URL: https://issues.apache.org/jira/browse/KAFKA-15215
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman
Assignee: Almog Gavra


Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
default.dsl.store config, it was decided to scope the initial KIP to just the 
two out-of-the-box state stores types offered by Streams, rocksdb and 
in-memory. The reason being that this would address a large number of the 
relevant use cases, and could always be followed up with another KIP for custom 
state stores if/when the demand arose.

Of course, since rocksdb is the default anyways, the only beneficiaries of this 
KIP right now are the people who specifically want only in-memory stores – yet 
custom state stores users are probably by far the ones with the greatest need 
for an easier way to configure the store type across an entire application. And 
unfortunately, because the config currently relies on enum definitions for the 
known OOTB store types, there's not really any way to extend this feature as it 
is to work with custom implementations.

I think this is a great feature, which is why I hope to see it extended to the 
broader user base. Most likely we'll want to introduce a new config for this, 
though whether it replaces the old default.dsl.store config or complements it 
will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-07-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15215:
---
Labels: needs-kip  (was: )

> The default.dsl.store config is not compatible with custom state stores
> ---
>
> Key: KAFKA-15215
> URL: https://issues.apache.org/jira/browse/KAFKA-15215
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
>
> Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
> default.dsl.store config, it was decided to scope the initial KIP to just the 
> two out-of-the-box state stores types offered by Streams, rocksdb and 
> in-memory. The reason being that this would address a large number of the 
> relevant use cases, and could always be followed up with another KIP for 
> custom state stores if/when the demand arose.
> Of course, since rocksdb is the default anyways, the only beneficiaries of 
> this KIP right now are the people who specifically want only in-memory stores 
> – yet custom state stores users are probably by far the ones with the 
> greatest need for an easier way to configure the store type across an entire 
> application. And unfortunately, because the config currently relies on enum 
> definitions for the known OOTB store types, there's not really any way to 
> extend this feature as it is to work with custom implementations.
> I think this is a great feature, which is why I hope to see it extended to 
> the broader user base. Most likely we'll want to introduce a new config for 
> this, though whether it replaces the old default.dsl.store config or 
> complements it will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


jeffkbkim commented on PR #13870:
URL: https://github.com/apache/kafka/pull/13870#issuecomment-1641304041

   the test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1267462476


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2643,9 +2652,175 @@ private CoordinatorResult 
updateStaticMemberAndRebalance(
 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
 }
+return maybeCompleteJoinPhase(group);
+}
+
+public CoordinatorResult genericGroupSync(
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+Optional errorOpt = validateSyncGroup(group, request);
+if (errorOpt.isPresent()) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(errorOpt.get().code()));
+
+} else if (group.isInState(EMPTY)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+} else if (group.isInState(PREPARING_REBALANCE)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+} else if (group.isInState(COMPLETING_REBALANCE)) {
+group.member(memberId).setAwaitingSyncFuture(responseFuture);
+removePendingSyncMember(group, request.memberId());
+
+// If this is the leader, then we can attempt to persist state and 
transition to stable
+if (group.isLeader(memberId)) {
+log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+"The group has {} members, {} of which are static.",
+memberId, groupId, group.generationId(),
+group.size(), group.allStaticMemberIds().size());
+
+// Fill all members with corresponding assignment. Reset 
members not specified in
+// the assignment to empty assignments.
+Map assignments = new HashMap<>();
+request.assignments()
+.forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+Set membersWithMissingAssignment = new HashSet<>();
+group.allMembers().forEach(member -> {
+byte[] assignment = assignments.get(member.memberId());
+if (assignment != null) {
+member.setAssignment(assignment);
+} else {
+membersWithMissingAssignment.add(member.memberId());
+member.setAssignment(new byte[0]);
+}
+});
+
+if (!membersWithMissingAssignment.isEmpty()) {
+log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+membersWithMissingAssignment, groupId, 
group.generationId());
+}
+
+CompletableFuture appendFuture = new 
CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+// Another member may have joined the group while we were 
awaiting this callback,
+// so we must ensure we are still in the 
CompletingRebalance state and the same generation
+// when it gets invoked. if we have transitioned to 
another state, then do nothing
+if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+if (t != null) {
+Errors error = Errors.forException(t);
+resetAndPropagateAssignmentWithError(group, error);
+maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+"during SyncGroup (member: " + memberId + 
").");
+} else {
+// Members' assignments were already updated. 
Propagate and transition to Stable.
+propagateAssignment(group, Errors.NONE);
+group.transitionTo(STABLE);
+}
+}
+});
+
+List records = Collections.singletonList(
+RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+);
+return new CoordinatorResult<>(records, appendFuture);
+}
+
+} else if (group.isInState(STABLE)) {
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1267441961


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2643,9 +2652,175 @@ private CoordinatorResult 
updateStaticMemberAndRebalance(
 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
 }
+return maybeCompleteJoinPhase(group);
+}
+
+public CoordinatorResult genericGroupSync(
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+Optional errorOpt = validateSyncGroup(group, request);
+if (errorOpt.isPresent()) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(errorOpt.get().code()));
+
+} else if (group.isInState(EMPTY)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+} else if (group.isInState(PREPARING_REBALANCE)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+} else if (group.isInState(COMPLETING_REBALANCE)) {
+group.member(memberId).setAwaitingSyncFuture(responseFuture);
+removePendingSyncMember(group, request.memberId());
+
+// If this is the leader, then we can attempt to persist state and 
transition to stable
+if (group.isLeader(memberId)) {
+log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+"The group has {} members, {} of which are static.",
+memberId, groupId, group.generationId(),
+group.size(), group.allStaticMemberIds().size());
+
+// Fill all members with corresponding assignment. Reset 
members not specified in
+// the assignment to empty assignments.
+Map assignments = new HashMap<>();
+request.assignments()
+.forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+Set membersWithMissingAssignment = new HashSet<>();
+group.allMembers().forEach(member -> {
+byte[] assignment = assignments.get(member.memberId());
+if (assignment != null) {
+member.setAssignment(assignment);
+} else {
+membersWithMissingAssignment.add(member.memberId());
+member.setAssignment(new byte[0]);
+}
+});

Review Comment:
   I thought it would be safe because 
   `resetAndPropagateAssignmentWithError(group, error);` resets all members to 
empty assignment state if the commit fails. 
   
   Let's say that another request comes in before the commit fails. The only 
place we respond the member assignment to the client is when we receive a sync 
group request in Stable state. So, we never expose the updated assignment until 
the write succeeds and transitions the state to Stable.
   
   Do you think this is too brittle?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14029: KAFKA-10579: Upgrade reflections from 0.9.12 to 0.10.2

2023-07-18 Thread via GitHub


gharris1727 commented on PR #14029:
URL: https://github.com/apache/kafka/pull/14029#issuecomment-1641250195

   > Would it be too much to ask to run system tests?
   
   I ran the connect system tests on a spare machine and got 181/181 passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1267427788


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2643,9 +2652,175 @@ private CoordinatorResult 
updateStaticMemberAndRebalance(
 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
 }
+return maybeCompleteJoinPhase(group);
+}
+
+public CoordinatorResult genericGroupSync(
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+Optional errorOpt = validateSyncGroup(group, request);
+if (errorOpt.isPresent()) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(errorOpt.get().code()));

Review Comment:
   As the genericGroupJoin catches the exception, i will repeat the behavior in 
genericGroupSync to keep it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15194) Rename local tiered storage segment with offset as prefix for easy navigation

2023-07-18 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744384#comment-17744384
 ] 

Lan Ding commented on KAFKA-15194:
--

[~divijvaidya] Sorry, just saw the message, [~owen-leung] has already picked up 
this issue.

> Rename local tiered storage segment with offset as prefix for easy navigation
> -
>
> Key: KAFKA-15194
> URL: https://issues.apache.org/jira/browse/KAFKA-15194
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Owen C.H. Leung
>Priority: Minor
>  Labels: newbie
>
> In LocalTieredStorage which is an implementation of RemoteStorageManager, 
> segments are saved with random UUID. This makes navigating to a particular 
> segment harder. To navigate a given segment by offset, prepend the offset 
> information to the segment filename.
> https://github.com/apache/kafka/pull/13837#discussion_r1258896009



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-07-18 Thread via GitHub


ijuma commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1641123937

   If I'm reading this right, it seems very restrictive. It's very common for 
other companies to build AK for their own purposes. If they're not allowed to 
use gradle enterprise, then it should be disabled by default and enabled via a 
parameter in the Apache CI build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException

2023-07-18 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744377#comment-17744377
 ] 

Ismael Juma commented on KAFKA-15205:
-

Have you seen this behavior in practice? We should definitely fix our error 
handling if it's not compliant, but it's not clear to me if this is more of a 
theoretical issue.

> Race condition in ShutdownableThread causes InterruptedException
> 
>
> Key: KAFKA-15205
> URL: https://issues.apache.org/jira/browse/KAFKA-15205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> In Shutdownable thread, during close, we call:
> initiateShutdown() -> which may interrupt the thread if 
> isInterruptible is set to true during construction.
> After that, we wait for proper shutdown using 
> awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, 
> which could be caused by initiateShutdown() earlier, await() throws an 
> InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an 
> interrupted exception.
> The sequence to reproduce this will be as follows:
> App-thread: Name of application thread which spawns and closes Shutdownable 
> thread
> Shut-thread: Name of the shutdownable thread.
> 1. App-thread calls ShutThread.initiateShutdown()
> 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the 
> actual interrupt will be async. initiateShutdown() from step 1 returns.
> 3. App-thread calls ShutThread.awaitShutdown() 
> 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await
> 5. VM decides to interrupt App-thread and there is a race condition now.
> Race condition:
> Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements 
> the CountdownLatch
> Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() 
> throws an interruptedException as per the contract 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--]
> *Solution*
>  
> In ShutDownableThread#awaitShutdown(), when calling await() we should catch 
> InterruptedException and eat it up (do nothing), if the thread has 
> isInterruptable set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-10775) DOAP has incorrect category

2023-07-18 Thread Sebb (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebb closed KAFKA-10775.


> DOAP has incorrect category
> ---
>
> Key: KAFKA-10775
> URL: https://issues.apache.org/jira/browse/KAFKA-10775
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sebb
>Assignee: Sebb
>Priority: Major
> Fix For: 3.1.0
>
>
> https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36
> reads:
>  rdf:resource="https://projects.apache.org/projects.html?category#big-data; />
> This should be
> http://projects.apache.org/category/big-data; />
> c.f.
> http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-18 Thread Lixin Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lixin Yao updated KAFKA-15214:
--
Parent: KAFKA-7739
Issue Type: Sub-task  (was: Improvement)

> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Sub-task
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-07-18 Thread Said BOUDJELDA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744366#comment-17744366
 ] 

Said BOUDJELDA commented on KAFKA-15000:


I can take this  Jira 

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2, 3.5.1
>Reporter: Arushi Rai
>Assignee: Said BOUDJELDA
>Priority: Critical
> Fix For: 3.6.0
>
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to 
> the same. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-07-18 Thread Said BOUDJELDA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Said BOUDJELDA reassigned KAFKA-15000:
--

Assignee: Said BOUDJELDA

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2, 3.5.1
>Reporter: Arushi Rai
>Assignee: Said BOUDJELDA
>Priority: Critical
> Fix For: 3.6.0
>
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to 
> the same. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module

2023-07-18 Thread via GitHub


muralibasani commented on PR #13417:
URL: https://github.com/apache/kafka/pull/13417#issuecomment-1641043256

   @fvaleri  
   Managed to remove the core dependency from tools. 
   If am in the right direction, will add tests, fix conflicts, and several 
other minor things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14031: Docs: Include ZK Deprecation notice and information

2023-07-18 Thread via GitHub


mjsax commented on PR #14031:
URL: https://github.com/apache/kafka/pull/14031#issuecomment-1641032657

   Merged to `trunk` and cherry-picked to `3.5` branch. Will take care to get 
the changes into `kafka-site`, too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14031: Docs: Include ZK Deprecation notice and information

2023-07-18 Thread via GitHub


mjsax merged PR #14031:
URL: https://github.com/apache/kafka/pull/14031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289475


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -176,17 +210,53 @@ public long dualWriteOffset() {
 }
 
 public void incrementTimedOutHeartbeats() {
-timedOutHeartbeats.addAndGet(1);
+timedOutHeartbeats.incrementAndGet();
 }
 
-public void setTimedOutHeartbeats(long heartbeats) {
-timedOutHeartbeats.set(heartbeats);
+public void setTimedOutHeartbeats(long value) {
+timedOutHeartbeats.set(value);
 }

Review Comment:
   Yeah, I guess we can remove these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15091:
--
Fix Version/s: 3.3.3

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289020


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   added a test that should catch this



##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   added a test that should catch this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15091:
--
Fix Version/s: 3.4.2

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267287803


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -418,24 +415,30 @@ LogDeltaManifest loadLogDelta(
 public void handleLoadSnapshot(SnapshotReader 
reader) {
 eventQueue.append(() -> {
 try {
+long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
+snapshotName, numLoaded);
 MetadataDelta delta = new MetadataDelta.Builder().
 setImage(image).
 build();
 SnapshotManifest manifest = loadSnapshot(delta, reader);
-log.info("handleLoadSnapshot: generated a metadata delta from 
a snapshot at offset {} " +
-"in {} us.", 
manifest.provenance().lastContainedOffset(),
+log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
+"and this snapshot in {} us.", snapshotName,
+image.provenance().lastContainedOffset(),
 NANOSECONDS.toMicros(manifest.elapsedNs()));
 try {
 image = delta.apply(manifest.provenance());
 } catch (Throwable e) {
 faultHandler.handleFault("Error generating new metadata 
image from " +
-"snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+"snapshot " + snapshotName, e);

Review Comment:
   the offset is included in the snapshot name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15091:
--
Fix Version/s: 3.5.2

> Javadocs for SourceTask::commit are incorrect
> -
>
> Key: KAFKA-15091
> URL: https://issues.apache.org/jira/browse/KAFKA-15091
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0, 3.5.2
>
>
> The Javadocs for {{SourceTask::commit}} state that the method should:
> {quote}Commit the offsets, up to the offsets that have been returned by 
> [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()].
> {quote}
> However, this is obviously incorrect given how the Connect runtime (when not 
> configured with exactly-once support for source connectors) performs polling 
> and offset commits on separate threads. There's also some extensive 
> discussion on the semantics of that method in KAFKA-5716 where it's made 
> clear that altering the behavior of the runtime to align with the documented 
> semantics of that method is not a viable option.
> We should update the Javadocs for this method to state that it does not have 
> anything to do with the offsets returned from {{SourceTask:poll}} and is 
> instead just a general, periodically-invoked hook to let the task know that 
> an offset commit has taken place (but with no guarantees as to which offsets 
> have been committed and which ones correspond to still-in-flight records).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit

2023-07-18 Thread via GitHub


C0urante merged PR #13948:
URL: https://github.com/apache/kafka/pull/13948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit

2023-07-18 Thread via GitHub


C0urante commented on code in PR #13948:
URL: https://github.com/apache/kafka/pull/13948#discussion_r1267283474


##
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:
##
@@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) {
 public abstract List poll() throws InterruptedException;
 
 /**
- * 
- * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
- * method should block until the commit is complete.
+ * This method is invoked periodically when offsets are committed for this 
source task. Note that the offsets
+ * being committed won't necessarily correspond to the latest offsets 
returned by this source task via
+ * {@link #poll()}. When exactly-once support is disabled, offsets are 
committed periodically and asynchronously
+ * (i.e. on a separate thread from the one which calls {@link #poll()}). 
When exactly-once support is enabled,
+ * offsets are committed on transaction commits (also see {@link 
TransactionBoundary}).
  * 
  * SourceTasks are not required to implement this functionality; Kafka 
Connect will record offsets
  * automatically. This hook is provided for systems that also need to 
store offsets internally

Review Comment:
   The "store offsets internally" language isn't great, but I'd rather leave it 
for now and explore it further if/when we start discussing deprecating this 
method. Connector developers might theoretically use this for acknowledging JMS 
records, for example, which in a very loose sense is storing offsets (or at 
least, some JMS-specific equivalent of them) in that system.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit

2023-07-18 Thread via GitHub


C0urante commented on code in PR #13948:
URL: https://github.com/apache/kafka/pull/13948#discussion_r1267278752


##
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:
##
@@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) {
 public abstract List poll() throws InterruptedException;
 
 /**
- * 
- * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
- * method should block until the commit is complete.
+ * This method is invoked periodically when offsets are committed for this 
source task. Note that the offsets
+ * being committed won't necessarily correspond to the latest offsets 
returned by this source task via
+ * {@link #poll()}. When exactly-once support is disabled, offsets are 
committed periodically and asynchronously
+ * (i.e. on a separate thread from the one which calls {@link #poll()}). 
When exactly-once support is enabled,
+ * offsets are committed on transaction commits (also see {@link 
TransactionBoundary}).

Review Comment:
   Looks great, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic

2023-07-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-12842:
---

Assignee: Greg Harris

> Failing test: 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
> --
>
> Key: KAFKA-12842
> URL: https://issues.apache.org/jira/browse/KAFKA-12842
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: John Roesler
>Assignee: Greg Harris
>Priority: Major
>
> This test failed during a PR build, which means that it failed twice in a 
> row, due to the test-retry logic in PR builds.
>  
> [https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209]
>  
> {noformat}
> java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141)
>   at 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> 

[jira] [Assigned] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke

2023-07-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-8690:
--

Assignee: Greg Harris

> Flakey test  ConnectWorkerIntegrationTest#testAddAndRemoveWorke
> ---
>
> Key: KAFKA-8690
> URL: https://issues.apache.org/jira/browse/KAFKA-8690
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Greg Harris
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull]
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > 
> testAddAndRemoveWorker STARTED*02:56:46* 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46*
>  *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest 
> > testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: 
> Condition not met within timeout 15000. Connector tasks did not start in 
> time.*02:56:46* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46*
>  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46*
>  at 
> org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy

2023-07-18 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-10579:
---

Assignee: Greg Harris

> Flaky test 
> connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
> 
>
> Key: KAFKA-10579
> URL: https://issues.apache.org/jira/browse/KAFKA-10579
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
>  
> {{java.lang.NullPointerException
>   at 
> java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
>   at org.reflections.Store.getAllIncluding(Store.java:82)
>   at org.reflections.Store.getAll(Store.java:93)
>   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
>   at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
>   at 
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
>   at 
> org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
>   at 
> org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
>   at 
> org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}}
> {{}}
> https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] forlack commented on pull request #14031: Docs: Include ZK Deprecation notice and information

2023-07-18 Thread via GitHub


forlack commented on PR #14031:
URL: https://github.com/apache/kafka/pull/14031#issuecomment-1640948633

   I updated the sentence suggested by @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-18 Thread Lixin Yao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744347#comment-17744347
 ] 

Lixin Yao commented on KAFKA-15214:
---

Here is one of the example scenarios motivates me to create this request. When 
I am testing the tiered storage feature, I noticed unbalanced byte out traffic 
rate across brokers. From the available metrics, it's very confusing because 
all the topic partitions should be balanced across cluster without any skew. I 
then check the existing error metrics RemoteReadErrorsPerSec, there is only 0 
value reported. This confuse me with the impression that there is no problem on 
remote downloading. so I have to deep dive into logs for more information. At 
the end, what I find is one broker is not able to fetch remote segments because 
of this OffsetOutOfRangeException exception consistently happening repeatedly. 
Example error looks like this:
{code:java}
2023-07-15 00:12:42,471 kafkaLogLevel="INFO" [RemoteLogReader-2]: 
OffsetOutOfRangeException occurred while reading the remote data for 
mytopic-247: org.apache.kafka.common.errors.OffsetOutOfRangeException: Received 
request for offset 0 for leader epoch 0 and partition mytopic-247 which does 
not exist in remote tier. Try again later. 
kafkaLoggerClass="kafka.log.remote.RemoteLogReader" 
kafkaLoggerThread="RemoteLogReader-2"   {code}
Like I said why this partition is requesting for offset 0 repeatedly could be 
due to other reason, e.g. corrupted metadata or other issues, but if this error 
is included as part of RemoteReadErrorsPerSec metrics, it could help me a lot 
on identifying the root cause and setup alerting.

Hope this makes sense to you. I am ok to include this as a tag on existing 
metrics. As long as I have a way to quickly identify and alert on the abnormal 
behavior, I am ok with it.  

> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267254133


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267253682


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267252934


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -282,15 +294,15 @@ class SharedServer(
   setDisabledReason(snapshotsDisabledReason).
   setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
   build()
-_raftManager.register(loader)

Review Comment:
   Not really fixing a bug per se but it's probably slightly better for 
performance this way. Mainly, I did not want the `get()` call to be blocked 
behind another event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251882


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -282,15 +294,15 @@ class SharedServer(
   setDisabledReason(snapshotsDisabledReason).
   setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
   build()
-_raftManager.register(loader)
 try {
-  
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+  loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()

Review Comment:
   yes, although I think it's a bug that's very unlikely to cause harm in 
practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251209


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -259,15 +262,24 @@ class SharedServer(
 raftManager = _raftManager
 _raftManager.startup()
 
+metadataLoaderMetrics = if (brokerMetrics != null) {
+  new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+batchSize => brokerMetrics.updateBatchSize(batchSize),
+brokerMetrics.lastAppliedImageProvenance)
+} else {
+  new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+_ => {},
+_ => {},
+new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))

Review Comment:
   There are a few broker-specific metrics hanging out in 
`BrokerServerMetrics.scala` and this is a way to connect 
`MetadataLoaderMetrics` to that class. So that the metadata loader only needs 
to interact with `MetadataLoaderMetrics` and not the broker-specific code.
   
   Long-term, we probably want to move all the loader metrics into 
`MetadataLoaderMetrics`, and make them all accessible on the controller as well 
as broker. But that's out of scope for this change (and would need a KIP anyway)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267248743


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -782,7 +735,7 @@ public void testTimeouts() throws Throwable {
 build()
 ) {
 QuorumController controller = controlEnv.activeController();
-CountDownLatch countDownLatch = controller.pause();
+CountDownLatch countDownLatch = pause(controller);

Review Comment:
   yeah, we can remove this and just use the test function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267249264


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
 static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
 fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided 
bootstrap");
 
-static class MockControllerMetrics extends QuorumControllerMetrics {
-final AtomicBoolean closed = new AtomicBoolean(false);
-
-MockControllerMetrics() {
-super(Optional.empty(), Time.SYSTEM, false);
-}
-
-@Override
-public void close() {
-super.close();
-closed.set(true);
-}
-}
-
-/**
- * Test creating a new QuorumController and closing it.
- */
-@Test
-public void testCreateAndClose() throws Throwable {
-MockControllerMetrics metrics = new MockControllerMetrics();
-try (
-LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
-build();
-QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
-setControllerBuilderInitializer(controllerBuilder -> {
-controllerBuilder.setMetrics(metrics);
-}).
-build()
-) {
-}
-assertTrue(metrics.closed.get(), "metrics were not closed");
-}

Review Comment:
   This was moved to 
`QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267248169


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   I've removed these usages, so the error propagates normally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267246622


##
metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+final AtomicInteger batchSize = new AtomicInteger(0);
+final AtomicReference provenance =
+new AtomicReference<>(MetadataProvenance.EMPTY);
+final MetadataLoaderMetrics metrics;
+
+FakeMetadataLoaderMetrics() {
+this(Optional.empty());
+}
+
+FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+this(Optional.of(registry));
+}
+
+FakeMetadataLoaderMetrics(Optional registry) {
+metrics = new MetadataLoaderMetrics(
+registry,
+n -> batchProcessingTimeNs.set(n),
+n -> batchSize.set(n),
+provenance);
+}
+
+@Override
+public void close() {
+metrics.close();
+}
+}
+
+@Test
+public void testMetricNames() {
+MetricsRegistry registry = new MetricsRegistry();
+try {
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+new HashSet<>(Arrays.asList(
+
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+)));
+}
+ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+Collections.emptySet());
+} finally {
+registry.shutdown();
+}
+}
+
+@Test
+public void testUpdateBatchProcessingTimeNs() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+}
+}
+
+@Test
+public void testUpdateBatchSize() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateBatchSize(50);
+assertEquals(50, fakeMetrics.batchSize.get());
+}
+}
+
+@Test
+public void testUpdateLastAppliedImageProvenance() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateLastAppliedImageProvenance(new 
MetadataProvenance(1L, 2, 3L));
+assertEquals(new MetadataProvenance(1L, 2, 3L), 
fakeMetrics.provenance.get());

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] C0urante commented on a diff in pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-18 Thread via GitHub


C0urante commented on code in PR #14024:
URL: https://github.com/apache/kafka/pull/14024#discussion_r1267245540


##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java:
##
@@ -65,7 +180,8 @@ public SinkRecord newRecord(String topic, Integer 
kafkaPartition, Schema keySche
 @Override
 public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema 
keySchema, Object key, Schema valueSchema, Object value,
 Long timestamp, Iterable headers) {
-return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
+return new SinkRecord(topic, kafkaPartition, keySchema, key, 
valueSchema, value, kafkaOffset(), timestamp, timestampType, headers,
+originalTopic(), originalKafkaPartition(), 
originalKafkaOffset());

Review Comment:
   A separate ticket/PR sounds great, thanks! Feel free to ping me on the PR if 
you implement the fx.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267237169


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   Gotcha! The latest patch works, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267238592


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
 continue;
 }
 Class pluginKlass = (Class) 
pluginImpl.getClass();
-if (pluginKlass.getClassLoader() != loader) {
+if (pluginKlass.getClassLoader() != source.loader()) {
 log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
 continue;
 }
-result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
 }
 }
 return result;
 }
 
+/**
+ * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
+ *
+ * @param klass The plugin superclass which is being loaded
+ * @param function A function on a {@link ServiceLoader} which may throw 
{@link LinkageError}
+ * @return the return value of function
+ * @throws Error errors thrown by the passed-in function
+ * @param  Type being iterated over by the ServiceLoader
+ * @param  Return value of the passed-in function
+ */
+private  U handleLinkageError(Class klass, PluginSource source, 
Supplier function) {
+// It's difficult to know for sure if the iterator was able to advance 
past the first broken
+// plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
+// to Iterator::hasNext or Iterator::next
+// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, 
which describes
+// the behavior we are trying to mitigate with this logic as buggy, 
but indicates that a fix
+// in the JDK standard library ServiceLoader implementation is 
unlikely to land
+LinkageError lastError = null;
+// Try a fixed maximum number of times in case the ServiceLoader 
cannot move past a faulty plugin,
+// but the LinkageError varies between calls. This limit is chosen to 
be higher than the typical number
+// of plugins in a single plugin location, and to limit the amount of 
log-spam on startup.
+for (int i = 0; i < 100; i++) {
+try {
+return function.get();
+} catch (LinkageError t) {
+// As an optimization, hide subsequent error logs if two 
consecutive errors look similar.
+// This reduces log-spam for iterators which cannot advance 
and rethrow the same exception.
+if (lastError == null
+|| !Objects.equals(lastError.getClass(), t.getClass())
+|| 

[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267236920


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   I think that given the low likelihood of ever hitting this error when 
invoking `ServiceLoader::load` or `ServiceLoader::iterator` (neither is 
annotated with any kind of `throws` declaration, the iterator is specifically 
called out as being lazy-loading, and I'm not aware of any JDK bug reports that 
would lead to this kind of behavior), it's better to throw errors to the caller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14669:
--
Priority: Blocker  (was: Major)

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Blocker
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744337#comment-17744337
 ] 

Chris Egerton commented on KAFKA-14669:
---

Reopening and marking as a blocker with fix version 3.6.0 so that we can merge 
[https://github.com/apache/kafka/pull/14041] (or another fix for the same 
issue) before this makes it into a release.

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Blocker
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reopened KAFKA-14669:
---

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page

2023-07-18 Thread via GitHub


C0urante commented on PR #14041:
URL: https://github.com/apache/kafka/pull/14041#issuecomment-1640847533

   CC @tinaselenge; sorry for missing this during review! Just wanted to give 
you a heads-up in case you make or review docs changes in the future.
   
   @gharris1727 @mimaison would appreciate it if either of you could take a 
look at this quick fix. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page

2023-07-18 Thread via GitHub


C0urante opened a new pull request, #14041:
URL: https://github.com/apache/kafka/pull/14041

   Follow-up for https://github.com/apache/kafka/pull/13658, where we forgot to 
update the table of contents to include the new 3.8 section for MirrorMaker 2. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-18 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744334#comment-17744334
 ] 

Divij Vaidya commented on KAFKA-15214:
--

Hi Lixin
Can you help us understand more about the motivation (perhaps by explaining an 
example scenario where this metric would be useful)? Also, may want to consider 
adding specific exception type as a "tag" to the existing error metric.

> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-18 Thread via GitHub


bmscomp commented on PR #14032:
URL: https://github.com/apache/kafka/pull/14032#issuecomment-1640794180

   @divijvaidya  It's a rebase issue, now that I am using the last version of 
`.asf.yam` as an issue is fixed before the execution is going on 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-18 Thread via GitHub


divijvaidya commented on PR #14032:
URL: https://github.com/apache/kafka/pull/14032#issuecomment-1640773987

   Tests are failing with a related error.
   ```
   * What went wrong:
   
   Gradle could not start your build.
   
   > Cannot create service of type BuildSessionActionExecutor using method 
LauncherServices$ToolingBuildSessionScopeServices.createActionExecutor() as 
there is a problem with parameter #21 of type FileSystemWatchingInformation.
   
  > Cannot create service of type BuildLifecycleAwareVirtualFileSystem 
using method 
VirtualFileSystemServices$GradleUserHomeServices.createVirtualFileSystem() as 
there is a problem with parameter #7 of type GlobalCacheLocations.
   
 > Cannot create service of type GlobalCacheLocations using method 
GradleUserHomeScopeServices.createGlobalCacheLocations() as there is a problem 
with parameter #1 of type List.
   
> Could not create service of type FileAccessTimeJournal using 
GradleUserHomeScopeServices.createFileAccessTimeJournal().
   
   > Timeout waiting to lock journal cache 
(/home/jenkins/.gradle/caches/journal-1). It is currently in use by another 
Gradle instance.
   
 Owner PID: 4026719
   
 Our PID: 163366
   
 Owner Operation: 
   
 Our operation: 
   
 Lock file: 
/home/jenkins/.gradle/caches/journal-1/journal-1.lock
   ```
   
   @bmscomp did you try running `./gradlew build` locally? Please ensure that 
build is successful for you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] stevenbooke commented on pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-07-18 Thread via GitHub


stevenbooke commented on PR #13842:
URL: https://github.com/apache/kafka/pull/13842#issuecomment-1640762194

   As you have recommended, the script will create a new branch for the 
changes, commit the changes to the new branch, and open a pull request with the 
updated `.asf.yaml` file. The updated `.asf.yaml` file will be the same format 
as it was previously (it will retain the comments and only updated the 
`github_whitelist` list and the `collaborators` list).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Staniel commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-18 Thread via GitHub


Staniel commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1267173038


##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -54,10 +60,14 @@ public Void call() {
 logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
 
 FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
+
brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
 result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
 } catch (OffsetOutOfRangeException e) {
 result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-15214 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-07-18 Thread via GitHub


stevenbooke commented on code in PR #13842:
URL: https://github.com/apache/kafka/pull/13842#discussion_r1267168635


##
refresh-collaborators.py:
##
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+from datetime import datetime, timedelta
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS AND THEIR COMMIT VOLUME OVER THE LAST YEAR TO THE 
apache/kafka REPO ###
+n = 10
+contributors_login_to_commit_volume = {}
+end_date = datetime.now()
+start_date = end_date - timedelta(days=365)
+for commit in repo.get_commits(since=start_date, until=end_date):
+login = commit.author.login

Review Comment:
   Thank you for pointing that out. I will make the appropriate changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-18 Thread Lixin Yao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lixin Yao updated KAFKA-15214:
--
Description: 
In the current metrics RemoteReadErrorsPerSec, the exception type 
OffsetOutOfRangeException is not included.

In our testing with tiered storage feature (at Apple), we noticed several cases 
where remote download is affected and stuck due to repeatedly 
OffsetOutOfRangeException in some particular broker or topic partitions. The 
root cause could be various but currently without a metrics it's very hard to 
catch this issue and debug in a timely fashion. It's understandable that the 
exception itself could not be the root cause but this exception metric could be 
a good metrics for us to alert and investigate.

Related discussion
[https://github.com/apache/kafka/pull/13944#discussion_r1266243006]

I am happy to contribute to this if the request is agreed.

  was:
In the current metrics RemoteReadErrorsPerSec, the exception type 
OffsetOutOfRangeException is not included.


In our testing with tiered storage feature, we noticed several cases where 
remote download is affected and stuck due to repeatedly 
OffsetOutOfRangeException in some particular broker or topic partitions. The 
root cause could be various but currently without a metrics it's very hard to 
catch this issue and debug in a timely fashion. It's understandable that the 
exception itself could not be the root cause but this exception metric could be 
a good metrics for us to alert and investigate.

Related discussion
[https://github.com/apache/kafka/pull/13944#discussion_r1266243006]

I am happy to contribute to this if the request is agreed.


> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tanay27 opened a new pull request, #14040: KAFKA-15212: Delete Classgraph-MIT license

2023-07-18 Thread via GitHub


tanay27 opened a new pull request, #14040:
URL: https://github.com/apache/kafka/pull/14040

   Deleted classgraph MIT result file.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-18 Thread Lixin Yao (Jira)
Lixin Yao created KAFKA-15214:
-

 Summary: Add metrics for OffsetOutOfRangeException when tiered 
storage is enabled
 Key: KAFKA-15214
 URL: https://issues.apache.org/jira/browse/KAFKA-15214
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Affects Versions: 3.6.0
Reporter: Lixin Yao
 Fix For: 3.6.0


In the current metrics RemoteReadErrorsPerSec, the exception type 
OffsetOutOfRangeException is not included.


In our testing with tiered storage feature, we noticed several cases where 
remote download is affected and stuck due to repeatedly 
OffsetOutOfRangeException in some particular broker or topic partitions. The 
root cause could be various but currently without a metrics it's very hard to 
catch this issue and debug in a timely fashion. It's understandable that the 
exception itself could not be the root cause but this exception metric could be 
a good metrics for us to alert and investigate.

Related discussion
[https://github.com/apache/kafka/pull/13944#discussion_r1266243006]

I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15212) Remove unneeded classgraph license file

2023-07-18 Thread Tanay Karmarkar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Karmarkar reassigned KAFKA-15212:
---

Assignee: Tanay Karmarkar

> Remove unneeded classgraph license file
> ---
>
> Key: KAFKA-15212
> URL: https://issues.apache.org/jira/browse/KAFKA-15212
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Major
>  Labels: newbie
> Fix For: 3.6.0
>
>
> The license file for classgraph can be completely removed from here: 
> [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it 
> is not a dependency of Kafka any more.
> The associated package was removed from license at 
> [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp commented on a diff in pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-18 Thread via GitHub


bmscomp commented on code in PR #14032:
URL: https://github.com/apache/kafka/pull/14032#discussion_r1267159224


##
gradle/wrapper/gradle-wrapper.properties:
##
@@ -1,7 +1,7 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionSha256Sum=5625a0ae20fe000d9225d000b36909c7a0e0e8dda61c19b12da769add847c975
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-all.zip
+distributionSha256Sum=03ec176d388f2aa99defcadc3ac6adf8dd2bce5145a129659537c0874dea5ad1
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip

Review Comment:
   @divijvaidya  Updated the pull request to use the all distribution 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax edited comment on KAFKA-15190 at 7/18/23 6:22 PM:
--

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of task shuffling (and the rebalance in itself should 
not be an issue, as it's cheap)?


was (Author: mjsax):
One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #14039: KAFKA-15211: Mock InvalidParameterException in DistributedConfigTest

2023-07-18 Thread via GitHub


gharris1727 opened a new pull request, #14039:
URL: https://github.com/apache/kafka/pull/14039

   This exception doesn't get thrown by the BouncyCastleProvider's 
implementation of HmacSHA256, so simulate the error just in case BouncyCastle 
is loaded before the test is run.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Staniel commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-18 Thread via GitHub


Staniel commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1267143752


##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -54,10 +60,14 @@ public Void call() {
 logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
 
 FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
+
brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
 result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
 } catch (OffsetOutOfRangeException e) {
 result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));

Review Comment:
   Thank you. I will create a JIRA to track that discussion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267142346


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -89,52 +91,12 @@ public BatchAccumulator(
 this.appendLock = new ReentrantLock();
 }
 
-/**
- * Append a list of records into as many batches as necessary.
- *
- * The order of the elements in the records argument will match the order 
in the batches.
- * This method will use as many batches as necessary to serialize all of 
the records. Since
- * this method can split the records into multiple batches it is possible 
that some of the
- * records will get committed while other will not when the leader fails.
- *
- * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
- *  will be thrown
- * @param records the list of records to include in the batches
- * @return the expected offset of the last record
- * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
- * batch size; if this exception is throw some of the elements in 
records may have
- * been committed
- * @throws NotLeaderException if the epoch is less than the leader epoch
- * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
- * @throws BufferAllocationException if we failed to allocate memory for 
the records
- * @throws IllegalStateException if we tried to append new records after 
the batch has been built
- */
-public long append(int epoch, List records) {
-return append(epoch, records, false);
-}
-
-/**
- * Append a list of records into an atomic batch. We guarantee all records 
are included in the
- * same underlying record batch so that either all of the records become 
committed or none of
- * them do.
- *
- * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
- *  will be thrown
- * @param records the list of records to include in a batch
- * @return the expected offset of the last record
- * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
- * batch size; if this exception is throw none of the elements in 
records were
- * committed
- * @throws NotLeaderException if the epoch is less than the leader epoch
- * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
- * @throws BufferAllocationException if we failed to allocate memory for 
the records
- * @throws IllegalStateException if we tried to append new records after 
the batch has been built
- */
-public long appendAtomic(int epoch, List records) {
-return append(epoch, records, true);
-}
-
-private long append(int epoch, List records, boolean isAtomic) {
+public long append(
+int epoch,
+List records,
+OptionalLong requiredEndOffset,
+boolean isAtomic
+) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267137597


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -172,15 +176,17 @@ default void beginShutdown() {}
  * uncommitted entries after observing an epoch change.
  *
  * @param epoch the current leader epoch
+ * @param requiredEndOffset if this is set, it is the offset we must use 
as the end offset (inclusive).

Review Comment:
   ok. we can use `requiredBaseOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267130598


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -80,8 +81,11 @@ interface Listener {
  * epoch.
  *
  * @param leader the current leader and epoch
+ * @param endOffset the current log end offset (exclusive). This is 
useful for nodes that
+ *  are claiming leadership, because it lets them know 
what log offset they
+ *  should attempt to write to next.

Review Comment:
   > What problem are you trying so solve?
   
   We have to know the offset of records that we apply. But we apply records 
before we submit them to Raft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-8466:

Labels: newbie  (was: )

> Remove 'jackson-module-scala' dependency (and replace it with some code)
> 
>
> Key: KAFKA-8466
> URL: https://issues.apache.org/jira/browse/KAFKA-8466
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Dejan Stojadinović
>Priority: Minor
>  Labels: newbie
>
> *Prologue:* 
>  * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889]
>  * [https://github.com/apache/kafka/pull/5726/files#r289078080]
> *Rationale:* one dependency less is always a good thing.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15213) Provide the exact offset to QuorumController.replay

2023-07-18 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15213:


 Summary: Provide the exact offset to QuorumController.replay
 Key: KAFKA-15213
 URL: https://issues.apache.org/jira/browse/KAFKA-15213
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


Provide the exact offset to QuorumController.replay so that we can implement 
metadata transactions. We need this so that we can know the offset where the 
records will be applied before we apply them in QuorumControllers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)

2023-07-18 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744315#comment-17744315
 ] 

Divij Vaidya commented on KAFKA-8466:
-

Based on the comment above marking as Unassigned so that someone from community 
can pick this up.

> Remove 'jackson-module-scala' dependency (and replace it with some code)
> 
>
> Key: KAFKA-8466
> URL: https://issues.apache.org/jira/browse/KAFKA-8466
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Dejan Stojadinović
>Priority: Minor
>
> *Prologue:* 
>  * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889]
>  * [https://github.com/apache/kafka/pull/5726/files#r289078080]
> *Rationale:* one dependency less is always a good thing.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8466) Remove 'jackson-module-scala' dependency (and replace it with some code)

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-8466:
---

Assignee: (was: Dejan Stojadinović)

> Remove 'jackson-module-scala' dependency (and replace it with some code)
> 
>
> Key: KAFKA-8466
> URL: https://issues.apache.org/jira/browse/KAFKA-8466
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Dejan Stojadinović
>Priority: Minor
>
> *Prologue:* 
>  * [https://github.com/apache/kafka/pull/5454#issuecomment-497323889]
>  * [https://github.com/apache/kafka/pull/5726/files#r289078080]
> *Rationale:* one dependency less is always a good thing.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15000:
-
Affects Version/s: 3.5.1

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2, 3.5.1
>Reporter: Arushi Rai
>Priority: Critical
> Fix For: 3.6.0
>
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to 
> the same. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15000) High vulnerability PRISMA-2023-0067 reported in jackson-core

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15000:
-
Fix Version/s: 3.6.0

> High vulnerability PRISMA-2023-0067 reported in jackson-core
> 
>
> Key: KAFKA-15000
> URL: https://issues.apache.org/jira/browse/KAFKA-15000
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Arushi Rai
>Priority: Critical
> Fix For: 3.6.0
>
>
> Kafka is using jackson-core version 2.13.4 which has high vulnerability 
> reported [PRISMA-2023-0067. 
> |https://github.com/FasterXML/jackson-core/pull/827]
> This vulnerability is fix in Jackson-core 2.15.0 and Kafka should upgrade to 
> the same. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15212) Remove unneeded classgraph license file

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15212:
-
Summary: Remove unneeded classgraph license file  (was: Remove classgraph 
license)

> Remove unneeded classgraph license file
> ---
>
> Key: KAFKA-15212
> URL: https://issues.apache.org/jira/browse/KAFKA-15212
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: newbie
> Fix For: 3.6.0
>
>
> The license file for classgraph can be completely removed from here: 
> [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it 
> is not a dependency of Kafka any more.
> The associated package was removed from license at 
> [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15212) Remove classgraph license

2023-07-18 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15212:


 Summary: Remove classgraph license
 Key: KAFKA-15212
 URL: https://issues.apache.org/jira/browse/KAFKA-15212
 Project: Kafka
  Issue Type: Bug
Reporter: Divij Vaidya
 Fix For: 3.6.0


The license file for classgraph can be completely removed from here: 
[https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it 
is not a dependency of Kafka any more.

The associated package was removed from license at 
[https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15211) DistributedConfigTest#shouldFailWithInvalidKeySize fails when run after TestSslUtils#generate

2023-07-18 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15211:
---

 Summary: DistributedConfigTest#shouldFailWithInvalidKeySize fails 
when run after TestSslUtils#generate
 Key: KAFKA-15211
 URL: https://issues.apache.org/jira/browse/KAFKA-15211
 Project: Kafka
  Issue Type: Test
  Components: clients, KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


The DistributedConfigTest#shouldFailWithInvalidKeySize attempts to configure a 
hashing algorithm with a key size of 0. When run alone, this test passes, as 
the default Java hashing algorithm used rejects the key size.

However, when TestSslUtils#generate runs first, such as via the 
RestForwardingIntegrationTest, the BouncyCastleProvider is loaded, which 
provides an alternative hashing algorithm. This implementation does _not_ 
reject the key size, causing the test to fail.

We should ether prevent TestSslUtils#generate from leaving the 
BouncyCastleProvider loaded after use, or adjust the test to pass when the 
BouncyCastleProvider is loaded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.2

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-15208:


Assignee: Said BOUDJELDA

> Upgrade Jackson dependencies to version 2.15.2
> --
>
> Key: KAFKA-15208
> URL: https://issues.apache.org/jira/browse/KAFKA-15208
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, kraft
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>  Labels: dependencies
> Fix For: 3.6.0
>
>
> Upgrading the version of Jackson dependencies to the latest stable version 
> 2.15.2 can bring much bug fixing security issues solving and performance 
> improvement 
>  
> Check release notes back to the current version 
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15208) Upgrade Jackson dependencies to version 2.15.2

2023-07-18 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15208:
-
Fix Version/s: 3.6.0

> Upgrade Jackson dependencies to version 2.15.2
> --
>
> Key: KAFKA-15208
> URL: https://issues.apache.org/jira/browse/KAFKA-15208
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, kraft
>Reporter: Said BOUDJELDA
>Priority: Major
>  Labels: dependencies
> Fix For: 3.6.0
>
>
> Upgrading the version of Jackson dependencies to the latest stable version 
> 2.15.2 can bring much bug fixing security issues solving and performance 
> improvement 
>  
> Check release notes back to the current version 
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.15.2]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.14]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10775) DOAP has incorrect category

2023-07-18 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10775.

Fix Version/s: 3.1.0
 Assignee: Sebb
   Resolution: Fixed

> DOAP has incorrect category
> ---
>
> Key: KAFKA-10775
> URL: https://issues.apache.org/jira/browse/KAFKA-10775
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sebb
>Assignee: Sebb
>Priority: Major
> Fix For: 3.1.0
>
>
> https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36
> reads:
>  rdf:resource="https://projects.apache.org/projects.html?category#big-data; />
> This should be
> http://projects.apache.org/category/big-data; />
> c.f.
> http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #11423: Update doap_Kafka.rdf

2023-07-18 Thread via GitHub


mimaison commented on PR #11423:
URL: https://github.com/apache/kafka/pull/11423#issuecomment-1640685154

   Right, I wasn't aware of the associated ticket. I closed it now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-18 Thread via GitHub


divijvaidya commented on code in PR #14032:
URL: https://github.com/apache/kafka/pull/14032#discussion_r1267117269


##
gradle/wrapper/gradle-wrapper.properties:
##
@@ -1,7 +1,7 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionSha256Sum=5625a0ae20fe000d9225d000b36909c7a0e0e8dda61c19b12da769add847c975
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-all.zip
+distributionSha256Sum=03ec176d388f2aa99defcadc3ac6adf8dd2bce5145a129659537c0874dea5ad1
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip

Review Comment:
   we changed from "all" package to "bin". Is there a specific reason for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException

2023-07-18 Thread Tanay Karmarkar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Karmarkar reassigned KAFKA-15205:
---

Assignee: (was: Tanay Karmarkar)

> Race condition in ShutdownableThread causes InterruptedException
> 
>
> Key: KAFKA-15205
> URL: https://issues.apache.org/jira/browse/KAFKA-15205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> In Shutdownable thread, during close, we call:
> initiateShutdown() -> which may interrupt the thread if 
> isInterruptible is set to true during construction.
> After that, we wait for proper shutdown using 
> awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, 
> which could be caused by initiateShutdown() earlier, await() throws an 
> InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an 
> interrupted exception.
> The sequence to reproduce this will be as follows:
> App-thread: Name of application thread which spawns and closes Shutdownable 
> thread
> Shut-thread: Name of the shutdownable thread.
> 1. App-thread calls ShutThread.initiateShutdown()
> 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the 
> actual interrupt will be async. initiateShutdown() from step 1 returns.
> 3. App-thread calls ShutThread.awaitShutdown() 
> 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await
> 5. VM decides to interrupt App-thread and there is a race condition now.
> Race condition:
> Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements 
> the CountdownLatch
> Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() 
> throws an interruptedException as per the contract 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--]
> *Solution*
>  
> In ShutDownableThread#awaitShutdown(), when calling await() we should catch 
> InterruptedException and eat it up (do nothing), if the thread has 
> isInterruptable set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15205) Race condition in ShutdownableThread causes InterruptedException

2023-07-18 Thread Tanay Karmarkar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Karmarkar reassigned KAFKA-15205:
---

Assignee: Tanay Karmarkar

> Race condition in ShutdownableThread causes InterruptedException
> 
>
> Key: KAFKA-15205
> URL: https://issues.apache.org/jira/browse/KAFKA-15205
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Major
> Fix For: 3.6.0
>
>
> In Shutdownable thread, during close, we call:
> initiateShutdown() -> which may interrupt the thread if 
> isInterruptible is set to true during construction.
> After that, we wait for proper shutdown using 
> awaitShutdown() which in-turn calls CountdownLatch#await(). On interruption, 
> which could be caused by initiateShutdown() earlier, await() throws an 
> InterruptedExeception. Hence, awaitShutdown() is going to exit by throwing an 
> interrupted exception.
> The sequence to reproduce this will be as follows:
> App-thread: Name of application thread which spawns and closes Shutdownable 
> thread
> Shut-thread: Name of the shutdownable thread.
> 1. App-thread calls ShutThread.initiateShutdown()
> 2. ShutThread.interrupt() is called. It informs the VM to interrupt but the 
> actual interrupt will be async. initiateShutdown() from step 1 returns.
> 3. App-thread calls ShutThread.awaitShutdown() 
> 4. App-thread waits on shutdownComplete.await() i.e. on CountdownLatch#await
> 5. VM decides to interrupt App-thread and there is a race condition now.
> Race condition:
> Condition 1: Shut-thread.doWork() gets interrupted exception, and decrements 
> the CountdownLatch
> Condition 2: App-thread waiting on Shut-thread.shutdownComplete.await() 
> throws an interruptedException as per the contract 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await--]
> *Solution*
>  
> In ShutDownableThread#awaitShutdown(), when calling await() we should catch 
> InterruptedException and eat it up (do nothing), if the thread has 
> isInterruptable set to true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15199) remove leading and trailing spaces from user input in release.py

2023-07-18 Thread Tanay Karmarkar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tanay Karmarkar reassigned KAFKA-15199:
---

Assignee: Tanay Karmarkar

> remove leading and trailing spaces from user input in release.py
> 
>
> Key: KAFKA-15199
> URL: https://issues.apache.org/jira/browse/KAFKA-15199
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-18 Thread via GitHub


nizhikov commented on PR #13278:
URL: https://github.com/apache/kafka/pull/13278#issuecomment-164085

   @mimaison Thanks for the review. It seems I addressed all your comments. 
Please, take a look one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267103768


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
 continue;
 }
 Class pluginKlass = (Class) 
pluginImpl.getClass();
-if (pluginKlass.getClassLoader() != loader) {
+if (pluginKlass.getClassLoader() != source.loader()) {
 log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
 continue;
 }
-result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
 }
 }
 return result;
 }
 
+/**
+ * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
+ *
+ * @param klass The plugin superclass which is being loaded
+ * @param function A function on a {@link ServiceLoader} which may throw 
{@link LinkageError}
+ * @return the return value of function
+ * @throws Error errors thrown by the passed-in function
+ * @param  Type being iterated over by the ServiceLoader
+ * @param  Return value of the passed-in function
+ */
+private  U handleLinkageError(Class klass, PluginSource source, 
Supplier function) {
+// It's difficult to know for sure if the iterator was able to advance 
past the first broken
+// plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
+// to Iterator::hasNext or Iterator::next
+// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, 
which describes
+// the behavior we are trying to mitigate with this logic as buggy, 
but indicates that a fix
+// in the JDK standard library ServiceLoader implementation is 
unlikely to land
+LinkageError lastError = null;
+// Try a fixed maximum number of times in case the ServiceLoader 
cannot move past a faulty plugin,
+// but the LinkageError varies between calls. This limit is chosen to 
be higher than the typical number
+// of plugins in a single plugin location, and to limit the amount of 
log-spam on startup.
+for (int i = 0; i < 100; i++) {
+try {
+return function.get();
+} catch (LinkageError t) {
+// As an optimization, hide subsequent error logs if two 
consecutive errors look similar.
+// This reduces log-spam for iterators which cannot advance 
and rethrow the same exception.
+if (lastError == null
+|| !Objects.equals(lastError.getClass(), t.getClass())
+|| 

[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-18 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1267096601


##
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {
+/**
+ * Returns a list of duplicated items
+ */
+public static  Iterable duplicates(Iterable s) {
+return StreamSupport.stream(s.spliterator(), false)

Review Comment:
   Nice proposal. Thanks. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10775) DOAP has incorrect category

2023-07-18 Thread Piotr Zygielo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744305#comment-17744305
 ] 

Piotr Zygielo commented on KAFKA-10775:
---

May I ask to have it closed, please?

I can only clone it, and even that it's difference of just one letter, I think 
to close is much more better solution.

> DOAP has incorrect category
> ---
>
> Key: KAFKA-10775
> URL: https://issues.apache.org/jira/browse/KAFKA-10775
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sebb
>Priority: Major
>
> https://github.com/apache/kafka/blob/0df461582c78449fd39e35b241a77a7acf5735e2/doap_Kafka.rdf#L36
> reads:
>  rdf:resource="https://projects.apache.org/projects.html?category#big-data; />
> This should be
> http://projects.apache.org/category/big-data; />
> c.f.
> http://svn.apache.org/repos/asf/bigtop/site/trunk/content/resources/bigtop.rdf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267094589


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   > Since that field (and its accessor method) are currently only used for log 
messages, what do you think about altering PluginSource::location to return a 
string, and using "System classpath" in that case?
   
   I can't do that, because I need the Path object later in the migration 
script. I've used a non-null sentinel path instead, so that all of the logging 
call-sites are improved but the Path object is still available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-18 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1267093982


##
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {
+/**
+ * Returns a list of duplicated items
+ */
+public static  Iterable duplicates(Iterable s) {

Review Comment:
   > I wonder if we should return Set instead of Iterable
   
   Changes to return Set.
   
   > so maybe we can put this into tools?
   
   CoreUtils, CoreUtilsTest move to tools.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-18 Thread via GitHub


nizhikov commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1267084525


##
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.CoreUtils;
+import org.apache.kafka.server.util.Json;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+private static final int EARLIEST_VERSION = 1;
+
+public static void main(String[] args) throws Exception {
+execute(args, System.out);
+}
+
+static Collection> 
parseOffsetJsonStringWithoutDedup(String jsonData) {
+try {
+JsonNode js = Json.tryParseFull(jsonData).node();
+
+int version = EARLIEST_VERSION;
+
+if (js.has("version"))
+version = js.get("version").asInt();
+
+return parseJsonData(version, js);
+} catch (JsonProcessingException e) {
+throw new AdminOperationException("The input string is not a valid 
JSON");
+}
+}
+
+private static Collection> parseJsonData(int 
version, JsonNode js) throws JsonMappingException {

Review Comment:
   Refactored to use JsonValue when possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267082036


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   In the case where an Implementation of ServiceLoader may eagerly-evaluate a 
first provider and cause a LinkageError to appear, I wanted to re-use the 
handleLinkageError logging.
   
   I don't think that the retries for these operations would be effective, and 
that the later retries would probably be wasteful. However, I thought re-using 
the same function in both places was simpler than customizing the behavior of 
each call. Since this code is (probably never) going to be exercised, I tried 
to keep it as simple as possible.
   
   Do you think that we should log errors from load and iterator? I can catch 
the error or just let it propagate to the caller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15207) ProducerIdManager#generateProducerId() should return different error code for newer clients

2023-07-18 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744300#comment-17744300
 ] 

Justine Olshan commented on KAFKA-15207:


It's a bit tricky to know when we are receiving a newer client request here 
since the api is indirect. I suppose we could add a field to the api that 
specifies what the client version was, but that would require a bump to the api.

Is there something I'm missing that would work better?

> ProducerIdManager#generateProducerId() should return different error code for 
> newer clients
> ---
>
> Key: KAFKA-15207
> URL: https://issues.apache.org/jira/browse/KAFKA-15207
> Project: Kafka
>  Issue Type: Task
>Reporter: Jeff Kim
>Priority: Major
>
> [https://github.com/apache/kafka/pull/13267] made changes to 
> ProducerIdManager that does not block request handler threads while waiting 
> for a new producer id block. Instead of blocking, we return 
> COORDINATOR_LOAD_IN_PROGRESS.
>  
> We return this rather than REQUEST_TIMED_OUT since older clients treat the 
> error as fatal when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
>  
> For newer clients, we should return an error that is more aligned with what 
> the client experiences.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tanay27 commented on a diff in pull request #14035: KAFKA-15199: Remove Leading and Trailing Spaces

2023-07-18 Thread via GitHub


tanay27 commented on code in PR #14035:
URL: https://github.com/apache/kafka/pull/14035#discussion_r1267071244


##
release.py:
##
@@ -692,7 +700,7 @@ def select_gpg_key():
 fail("Ok, giving up")
 if not user_ok("Ok to push RC tag %s (y/n)?: " % rc_tag):
 fail("Ok, giving up")
-cmd("Pushing RC tag", "git push %s %s" % (PUSH_REMOTE_NAME, rc_tag))
+cmd("Pushing RC tag", f"git push {PUSH_REMOTE_NAME} {rc_tag}")

Review Comment:
   got it, reverting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-18 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1267070768


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1245,4 +1422,1304 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+removeGroup(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> genericGroup.add(member, null));
+groups.put(groupId, genericGroup);
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+ 

[jira] [Created] (KAFKA-15210) Mention vote should be open for at atleast 72 hours

2023-07-18 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15210:


 Summary: Mention vote should be open for at atleast 72 hours
 Key: KAFKA-15210
 URL: https://issues.apache.org/jira/browse/KAFKA-15210
 Project: Kafka
  Issue Type: Sub-task
Reporter: Divij Vaidya


The voting deadline should be at least 3 days from the time VOTE email is 
posted. Hence, the script should mention that the date should be at least 72 
hours from now. The change needs to be done at the line below: 



*** Please download, test and vote by 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >