[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-05 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803768#comment-17803768
 ] 

hzh0425 commented on KAFKA-16073:
-

I think we can solve this issue by change the order the following code:


{code:java}
// code placeholder
localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)

incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, 
LogStartOffsetIncrementReason.SegmentDeletion) {code}
to
{code:java}
// code placeholder
newLocalLogStartOffset = 
localLog.segments.higherSegment(deletable.last.baseOffset()).get().baseOffset()

incrementStartOffset(newLocalLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion) 

localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason){code}
cc [~ckamal] [~satish.duggana] 

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13949) Connect /connectors endpoint should support querying the active topics and the task configs

2024-01-05 Thread Sudesh Wasnik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sudesh Wasnik reassigned KAFKA-13949:
-

Assignee: Sudesh Wasnik  (was: Viktor Somogyi-Vass)

> Connect /connectors endpoint should support querying the active topics and 
> the task configs
> ---
>
> Key: KAFKA-13949
> URL: https://issues.apache.org/jira/browse/KAFKA-13949
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.2.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Sudesh Wasnik
>Priority: Major
>
> The /connectors endpoint supports the "expand" query parameter, which acts as 
> a set of queried categories, currently supporting info (config) and status 
> (monitoring status).
> The endpoint should also support adding the active topics of a connector, and 
> adding the separate task configs, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13949) Connect /connectors endpoint should support querying the active topics and the task configs

2024-01-05 Thread Sudesh Wasnik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803765#comment-17803765
 ] 

Sudesh Wasnik commented on KAFKA-13949:
---

HI [~viktorsomogyi]! I would like to pick up this old ticket and I am assigning 
it to myself, please let me know if you'd like to re-assign it to yourself.

> Connect /connectors endpoint should support querying the active topics and 
> the task configs
> ---
>
> Key: KAFKA-13949
> URL: https://issues.apache.org/jira/browse/KAFKA-13949
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.2.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> The /connectors endpoint supports the "expand" query parameter, which acts as 
> a set of queried categories, currently supporting info (config) and status 
> (monitoring status).
> The endpoint should also support adding the active topics of a connector, and 
> adding the separate task configs, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-01-05 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803749#comment-17803749
 ] 

Phuc Hong Tran commented on KAFKA-15682:


[~ckamal] is the retention rate of __remote_log_metadata defined by us or the 
user?

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Phuc Hong Tran
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15134:Enrich the prompt reason in CommitFailedException [kafka]

2024-01-05 Thread via GitHub


github-actions[bot] commented on PR #13930:
URL: https://github.com/apache/kafka/pull/13930#issuecomment-1879522834

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Remove consumer rejoin to the group's synchronized lock [kafka]

2024-01-05 Thread via GitHub


github-actions[bot] commented on PR #14241:
URL: https://github.com/apache/kafka/pull/14241#issuecomment-1879522799

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1443569935


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I guess nothing too complicated 
https://github.com/apache/kafka/pull/14489#discussion_r1350505733



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


jolshan commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1443569042


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   There was a lot of discussion on the naming in the previous pr. I'm happy to 
change, but want to revisit that conversation so we don't retread the same 
ground.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


artemlivshits commented on code in PR #15139:
URL: https://github.com/apache/kafka/pull/15139#discussion_r1443564382


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -27,21 +27,24 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   `updatedLeaders` have the info that has semantics similar to `followers` 
(something updated in the partition where the broker is the leader or a 
follower), so we should probably have names to be similar.  E.g. both should be 
`updated*` or both should have no prefix (then electedLeaders should probably 
be newLeaders or newlyElectedLeaders).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


jolshan commented on PR #15130:
URL: https://github.com/apache/kafka/pull/15130#issuecomment-1879485562

   Dont worry about `**/build/test-results/**/TEST-*.xml` (It has not worked as 
long as I can remember)
   
   I always just go to the tests tab: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15130/2/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-05 Thread via GitHub


jolshan commented on PR #15132:
URL: https://github.com/apache/kafka/pull/15132#issuecomment-1879484649

   > but there are more...
   
   
   
   In all seriousness, thanks for taking care of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


jolshan commented on PR #14489:
URL: https://github.com/apache/kafka/pull/14489#issuecomment-1879475298

   Part 1 PR here: https://github.com/apache/kafka/pull/15139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


jolshan opened a new pull request, #15139:
URL: https://github.com/apache/kafka/pull/15139

   This originally was https://github.com/apache/kafka/pull/14489 which covered 
2 aspects -- reloading on partition epoch changes where leader epoch did not 
change and reloading when leader epoch changed but we were already the leader. 
   
   I've cut out the second part of the change since the first part is much 
simpler.
   
   Redefining the TopicDelta fields to better distinguish when a leader is 
elected (leader epoch bump) vs when a leader has isr/replica changes (partition 
epoch bump). There are some cases where we bump the partition epoch but not the 
leader epoch. We do not need to do operations that only care about the leader 
epoch bump. (ie -- onElect callbacks)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-05 Thread via GitHub


jolshan commented on PR #14489:
URL: https://github.com/apache/kafka/pull/14489#issuecomment-1879456828

   I've been asked to split these changes into the two parts I mentioned. Will 
follow up with that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-05 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1443537258


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   This is updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16087) Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race

2024-01-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:

Affects Version/s: 3.2.0
   2.6.0

> Tasks dropping incorrect records when errors.tolerance=all and errors 
> reported asynchronously due to data race
> --
>
> Key: KAFKA-16087
> URL: https://issues.apache.org/jira/browse/KAFKA-16087
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0, 3.2.0, 3.7.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors 
> to push records to the connector DLQ topic. The implementation of this 
> reporter interacts with the ProcessingContext within the per-task 
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
> the current operation, such as what error has occurred or what record is 
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the 
> converter and transformation pipeline of the connector for similar reasons. 
> When the ErrantRecordReporter#report function is called from SinkTask#put, 
> there is no contention over the mutable state, as the thread used for 
> SinkTask#put is also responsible for converting and transforming the record. 
> However, if ErrantRecordReporter#report is called by an extra thread within 
> the SinkTask, there is thread contention on the single mutable 
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
> synchronized keyword was added to all methods of RetryWithToleranceOperator 
> which interact with the ProcessingContext. However, this solution still 
> allows the RWTO methods to interleave, and produce unintended data races. 
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call 
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation. 
> For example, [converting 
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error == 
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
> [RWTO 
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
>  and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently 
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO 
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
>  which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded 
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the 
> error reporter for processing, despite having produced no error during 
> processing.
> This per-method synchronization for returning nulls and errors separately is 
> insufficient, and either the data sharing should be avoided or a different 
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported 
> by the producer, and was introduced in KIP-779 (3.2.0)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16087) Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race

2024-01-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:

Description: 
The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors 
to push records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.

A similar flaw exists in source connectors and asynchronous errors reported by 
the producer, and was introduced in KIP-779 (3.2.0)

  was:
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns 

[jira] [Updated] (KAFKA-16087) Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race

2024-01-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:

Summary: Tasks dropping incorrect records when errors.tolerance=all and 
errors reported asynchronously due to data race  (was: Sink connector dropping 
incorrect record when ErrantRecordReporter used asynchronously)

> Tasks dropping incorrect records when errors.tolerance=all and errors 
> reported asynchronously due to data race
> --
>
> Key: KAFKA-16087
> URL: https://issues.apache.org/jira/browse/KAFKA-16087
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
> records to the connector DLQ topic. The implementation of this reporter 
> interacts with the ProcessingContext within the per-task 
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
> the current operation, such as what error has occurred or what record is 
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the 
> converter and transformation pipeline of the connector for similar reasons. 
> When the ErrantRecordReporter#report function is called from SinkTask#put, 
> there is no contention over the mutable state, as the thread used for 
> SinkTask#put is also responsible for converting and transforming the record. 
> However, if ErrantRecordReporter#report is called by an extra thread within 
> the SinkTask, there is thread contention on the single mutable 
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
> synchronized keyword was added to all methods of RetryWithToleranceOperator 
> which interact with the ProcessingContext. However, this solution still 
> allows the RWTO methods to interleave, and produce unintended data races. 
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call 
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation. 
> For example, [converting 
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error == 
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
> [RWTO 
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
>  and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently 
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO 
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
>  which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded 
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the 
> error reporter for processing, despite having produced no error during 
> processing.
> This per-method synchronization for returning nulls and errors separately is 
> insufficient, and either the data sharing should be avoided or a different 
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported 
> by the producer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16087) Sink connector dropping incorrect record when ErrantRecordReporter used asynchronously

2024-01-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:

Description: 
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.

A similar flaw exists in source connectors and asynchronous errors reported by 
the producer.

  was:
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the 

Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


ditac commented on PR #15130:
URL: https://github.com/apache/kafka/pull/15130#issuecomment-1879371419

   > Some `ZkMigrationIntegrationTest` tests failed. Can we run a rebuild?
   
   The tests seem to have passed in the re-run. The step 
`**/build/test-results/**/TEST-*.xml` seems to have issues but it doesnt seem 
like a blocker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16087) Sink connector dropping incorrect record when ErrantRecordReporter used asynchronously

2024-01-05 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16087:
---

 Summary: Sink connector dropping incorrect record when 
ErrantRecordReporter used asynchronously
 Key: KAFKA-16087
 URL: https://issues.apache.org/jira/browse/KAFKA-16087
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Greg Harris
Assignee: Greg Harris


The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push 
records to the connector DLQ topic. The implementation of this reporter 
interacts with the ProcessingContext within the per-task 
RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
the current operation, such as what error has occurred or what record is being 
operated on.

The ProcessingContext and RetryWithToleranceOperator is also used by the 
converter and transformation pipeline of the connector for similar reasons. 
When the ErrantRecordReporter#report function is called from SinkTask#put, 
there is no contention over the mutable state, as the thread used for 
SinkTask#put is also responsible for converting and transforming the record. 
However, if ErrantRecordReporter#report is called by an extra thread within the 
SinkTask, there is thread contention on the single mutable ProcessingContext.

This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
synchronized keyword was added to all methods of RetryWithToleranceOperator 
which interact with the ProcessingContext. However, this solution still allows 
the RWTO methods to interleave, and produce unintended data races. Consider the 
following interleaving:

1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call 
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For 
example, [converting 
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error == 
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
[RWTO 
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
 and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently 
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO 
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
 which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded 
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the 
error reporter for processing, despite having produced no error during 
processing.

This per-method synchronization for returning nulls and errors separately is 
insufficient, and either the data sharing should be avoided or a different 
locking mechanism should be used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-05 Thread via GitHub


wernerdv commented on code in PR #15101:
URL: https://github.com/apache/kafka/pull/15101#discussion_r1443373091


##
core/src/test/java/kafka/test/junit/LeakTestingExtension.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.utils.TestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ExtensionContext.Store;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import scala.Tuple2;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LeakTestingExtension implements BeforeEachCallback, 
AfterEachCallback {
+private static final Set EXPECTED_THREAD_NAMES = new HashSet<>(
+Arrays.asList("junit-", "JMX", 
"feature-zk-node-event-process-thread", "ForkJoinPool", "executor-",
+"metrics-meter-tick-thread", "scala-", "pool-")
+);
+private static final String THREADS_KEY = "threads";
+
+@Override
+public void beforeEach(ExtensionContext context) {
+getStore(context).put(THREADS_KEY, 
Thread.getAllStackTraces().keySet());
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public void afterEach(ExtensionContext context) {
+Set initialThreads = getStore(context).remove(THREADS_KEY, 
Set.class);
+Tuple2, Object> unexpectedThreads = 
TestUtils.computeUntilTrue(
+() -> unexpectedThreads(initialThreads),
+DEFAULT_MAX_WAIT_MS,
+100L,
+Set::isEmpty
+);
+
+assertTrue(unexpectedThreads._1.isEmpty(), "Found unexpected threads 
after executing test: " +
+
unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(",
 ")));
+}
+
+private Set unexpectedThreads(Set initialThreads) {
+Set finalThreads = Thread.getAllStackTraces().keySet();
+
+if (initialThreads.size() != finalThreads.size()) {

Review Comment:
   @ashwinpankaj removed size comparison and now thread names are checked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2024-01-05 Thread via GitHub


Phuc-Hong-Tran commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-1879233506

   @philipnee, is there anything else that I should change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-05 Thread via GitHub


lucasbru merged PR #15117:
URL: https://github.com/apache/kafka/pull/15117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-05 Thread via GitHub


clolov commented on code in PR #15112:
URL: https://github.com/apache/kafka/pull/15112#discussion_r1443172726


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4904,7 +4870,8 @@ public void suspend() {
 final Map> assignment = new 
HashMap<>(taskId00Assignment);
 assignment.putAll(taskId01Assignment);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
-replay(consumer);
+
+taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Yes, we are using the old consumer in all places making calls to
   ```
   private static void expectConsumerAssignmentPaused(final 
Consumer consumer) {
   final Set assignment = singleton(new 
TopicPartition("assignment", 0));
   expect(consumer.assignment()).andReturn(assignment);
   consumer.pause(assignment);
   }
   ```
   This is also why I took the approach of having a separate Mockito consumer. 
Part 2 of this pull request will migrate all of those usages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move Raft io thread implementation to Java [kafka]

2024-01-05 Thread via GitHub


hachikuji merged PR #15119:
URL: https://github.com/apache/kafka/pull/15119


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443144457


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -805,6 +959,194 @@ public void reset() {
 public void close() {
 // nothing to close
 }
+
+@Override
+public void maybeRegisterTransactionIterator(final 
ManagedKeyValueIterator iterator) {
+// never register iterators as transaction iterator, because we 
have no transaction
+}
+
+@Override
+public void writeOffset(final TopicPartition topicPartition, final 
Long offset, final WriteBatchInterface batch) throws RocksDBException {
+final byte[] key = topicPartitionKeyCache.computeIfAbsent(
+topicPartition,
+tp -> TOPIC_PARTITION_SERIALIZER.serialize(null, tp)
+);
+if (offset == null) {
+batch.delete(offsetsCF, key);
+} else {
+final byte[] serializedOffset = 
OFFSET_SERIALIZER.serialize(null, offset);
+batch.put(offsetsCF, key, serializedOffset);
+}
+}
+
+@Override
+public void updatePosition(final Position position,
+   final TopicPartition topicPartition,
+   final Long offset) throws RocksDBException {
+final byte[] key = TOPIC_PARTITION_SERIALIZER.serialize(null, 
topicPartition);
+if (offset == null) {
+db.delete(offsetsCF, key);
+} else {
+final byte[] value = OFFSET_SERIALIZER.serialize(null, offset);
+db.put(offsetsCF, key, value);
+position.withComponent(topicPartition.topic(), 
topicPartition.partition(), offset);
+}
+}
+}
+
+static class BatchedDBAccessor implements DBAccessor {
+
+private final RocksDB db;
+private final WriteBatchWithIndex batch = new 
WriteBatchWithIndex(true);
+private Position uncommittedPosition = Position.emptyPosition();
+private long uncommittedBytes;
+
+private final Map topicPartitionKeyCache = new 
HashMap<>();
+
+private final ColumnFamilyHandle offsetsCF;
+private final WriteOptions writeOptions;
+private final ReadOptions defaultReadOptions = new ReadOptions();
+
+// used to simulate calls from StreamThreads in tests
+boolean isStreamThreadForTest = false;
+
+private Set> openTransactionIterators 
= new HashSet<>();
+
+BatchedDBAccessor(final RocksDB db,
+  final ColumnFamilyHandle offsetsCF,
+  final WriteOptions writeOptions) {
+this.db = db;
+this.offsetsCF = offsetsCF;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] 
key) throws RocksDBException {
+if (Thread.currentThread() instanceof ProcessingThread || 
isStreamThreadForTest) {

Review Comment:
   We need to do this to ensure that, under `READ_COMMITTED`, interactive query 
threads don't read from the transaction buffer, since it's not thread-safe.
   
   While it's not ideal to add more to the hot path, `Thread.currentThread()` 
and `instanceof` checks are both JVM intrinsics, so should be negligible 
compared with the cost of querying RocksDB.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443141089


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -266,29 +269,20 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.info("Initializing to the starting offset for 
changelog {} of in-memory state store {}",
  store.changelogPartition, 
store.stateStore.name());
 } else if (store.offset() == null) {
-if 
(loadedCheckpoints.containsKey(store.changelogPartition)) {
-final Long offset = 
changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
-store.setOffset(offset);
 
-log.info("State store {} initialized from checkpoint 
with offset {} at changelog {}",
-  store.stateStore.name(), store.offset, 
store.changelogPartition);
-} else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
-// and hence we are uncertain that the current local 
state only contains committed data;
-// in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
-log.warn("State store {} did not find checkpoint 
offsets while stores are not empty, " +
-"since under EOS it has the risk of getting 
uncommitted data in stores we have to " +
-"treat it as a task corruption error and wipe 
out the local state of task {} " +
-"before re-bootstrapping", 
store.stateStore.name(), taskId);
-
-throw new 
TaskCorruptedException(Collections.singleton(taskId));
-} else {
-log.info("State store {} did not find checkpoint 
offset, hence would " +
-"default to the starting offset at changelog 
{}",
-store.stateStore.name(), 
store.changelogPartition);
-}
-}
+// load managed offsets from store
+Long offset = loadOffsetFromStore(store);
+
+// load offsets from .checkpoint file
+offset = loadOffsetFromCheckpointFile(offset, 
loadedCheckpoints, store);
+
+// no offsets found for store, store is corrupt if not 
empty
+throwCorruptIfNoOffsetAndNotEmpty(offset, storeDirIsEmpty);
+
+updateOffsetInMemory(offset, store);
+
+syncManagedOffsetInStore(offset, store);
+

Review Comment:
   I generally tried to avoid unnecessary changes/refactoring wherever 
possible, but the new logic here caused checkstyle to complain about NPath 
complexity, so I factored it out to several `private` methods.
   
   In the end, breaking this up has made it considerably easier to follow, and 
is now somewhat self-documenting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443138486


##
checkstyle/checkstyle.xml:
##
@@ -135,7 +135,7 @@
 
 
   
-  
+  

Review Comment:
   `TaskExecutor#commitOffsetsOrTransaction`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on code in PR #15137:
URL: https://github.com/apache/kafka/pull/15137#discussion_r1443136941


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,8 +268,7 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 final Collection corruptedPartitions = 
task.changelogPartitions();
 
 // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-// TODO: this step should be removed as we complete migrating 
to state updater
-if (markAsCorrupted && stateUpdater == null) {
+if (markAsCorrupted) {

Review Comment:
   This is necessary now that we only wipe stores when `store.corrupted == 
true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on PR #15137:
URL: https://github.com/apache/kafka/pull/15137#issuecomment-1878999651

   @cadonna @mjsax @ableegoldman @lucasbru @wcarlson5 @bbejeck @vvcephei 
@guozhangwang 
   
   My apologies to whoever draws the short straw of having to review this!
   
   This is the main implementation of KIP-892. I broke out as much as I could 
into independent PRs that have been merged ahead of this.
   
   If you would like to discuss anything in-person, I'm available on the 
Confluent Community Slack between 10:00 and 17:00 UTC, Monday to Friday. Happy 
to discuss on a video call if necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-05 Thread via GitHub


clolov closed pull request #15131: MINOR: Add public documentation for metrics 
introduced in KIP-963
URL: https://github.com/apache/kafka/pull/15131


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-05 Thread via GitHub


clolov commented on PR #15131:
URL: https://github.com/apache/kafka/pull/15131#issuecomment-1878999520

   Closing this PR and moving it to https://github.com/apache/kafka/pull/15138


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-05 Thread via GitHub


clolov opened a new pull request, #15138:
URL: https://github.com/apache/kafka/pull/15138

   Adding the public documentation for metrics introduced in 
[KIP-963](https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-05 Thread via GitHub


nicktelford opened a new pull request, #15137:
URL: https://github.com/apache/kafka/pull/15137

   See [KIP-892: Transactional Semantics for StateStores][1] for a detailed 
explanation of the design.
   
   ---
   
   Writes to StateStores are now isolated from the underlying RocksDB database 
unti the changelog transaction has been committed. We do this by writing first 
to a RocksDB `WriteBatchWithIndex`, which on-commit will be written to the 
RocksDB database.
   
   Now that writes to a StateStore are isolated by a "transaction buffer" 
(`WriteBatchWithIndex`), we can also isolate them from interactive query 
threads. We introduce a new configuration,
   `default.state.isolation.level`, which defaults to `READ_UNCOMMITTED` to 
provide the existing behaviour.
   
   When `processing.mode` is set to `exactly-once(-v2|-beta)`, the default 
swtiches to `READ_COMMITTED`, as this will be the desired behaviour for the 
vast majority of use-cases.
   
   Crucially, this ensures that in the event of an error, state stores no 
longer need to be wiped under EOS, unless the user has explicitly selected the 
`READ_UNCOMMITTED` isolation level.
   
   ---
   
   To ensure consistency, we now store offsets in a secondary column-family in 
RocksDB (`__offsets`). These offsets are updated on-commit. We guarantee that 
the `__offsets` memtables and data memtables are flushed atomically, together, 
using RocksDB's Atomic Flush. This allows us to remove the explicit `flush()` 
calls that previously forcibly flushed memtables to disk. RocksDB now entirely 
controls memtable flush behaviour via its configuration.
   
   We continue writing offsets to the `.checkpoint` file, to ensure that during 
rebalance, we can send offsets for currently closed stores, without having to 
open them just to read their offset. The `.checkpoint` file is only updated on 
store close for this reason. If the store is open, the currently in-memory 
offset is used instead.
   
   We also keep the `.checkpoint` file for custom `StateStore` implementations 
that do not manage their own offsets. For these stores, the legacy behaviour 
will still be used.
   
   ---
   
   The performance should be at least as good, if not better, than before, as 
batched writes to RocksDB avoid locking the memtable for every write.
   
   1: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2024-01-05 Thread via GitHub


mimaison commented on PR #14595:
URL: https://github.com/apache/kafka/pull/14595#issuecomment-1878961224

   Thanks for the PR. It fails compiling:
   ```
   > Task :core:compileTestScala
   [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14595/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala:62:73:
 type mismatch;
found   : Int
required: scala.collection.Seq[kafka.server.ControllerServer]
   Error occurred in an application involving default arguments.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-05 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-14404:
---

Assignee: Ayoub Omari  (was: Sujay Hegde)

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16032) Review inconsistent error handling of OffsetFetch and OffsetCommit responses

2024-01-05 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16032:
---
Description: 
OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
review around this:
 - The logic is duplicated for some errors that are treated similarly (ex. 
NOT_COORDINATOR)

 - Some errors are not handled similarly in both requests (ex. 
COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
OffsetFetch). Note that the specific errors handled by each request were kept 
the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
an attempt to handle the same errors, in the same way, whenever possible.

Note that the legacy approach handles expected errors for each path (FETCH and 
COMMIT), retrying on those when needed, but does not retry on unexpected 
retriable errors.

  was:
OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
review around this:
 - The logic is duplicated for some errors that are treated similarly (ex. 
NOT_COORDINATOR)

 - Some errors are not handled similarly in both requests (ex. 
COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
OffsetFetch). Note that the specific errors handled by each request were kept 
the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
an attempt to handle the same errors, in the same way, whenever possible.

This should be reviewed also with the goal of unifying the retry logic around 
those errors that is now applies in multiple different places of the manager 
depending on the request path.


> Review inconsistent error handling of OffsetFetch and OffsetCommit responses
> 
>
> Key: KAFKA-16032
> URL: https://issues.apache.org/jira/browse/KAFKA-16032
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
> review around this:
>  - The logic is duplicated for some errors that are treated similarly (ex. 
> NOT_COORDINATOR)
>  - Some errors are not handled similarly in both requests (ex. 
> COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
> OffsetFetch). Note that the specific errors handled by each request were kept 
> the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
> an attempt to handle the same errors, in the same way, whenever possible.
> Note that the legacy approach handles expected errors for each path (FETCH 
> and COMMIT), retrying on those when needed, but does not retry on unexpected 
> retriable errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15922) Add MetadataVersion for JBOD

2024-01-05 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano updated KAFKA-15922:
--
Fix Version/s: 3.7.0

> Add MetadataVersion for JBOD
> 
>
> Key: KAFKA-15922
> URL: https://issues.apache.org/jira/browse/KAFKA-15922
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15922) Add MetadataVersion for JBOD

2024-01-05 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-15922.
---
Resolution: Fixed

> Add MetadataVersion for JBOD
> 
>
> Key: KAFKA-15922
> URL: https://issues.apache.org/jira/browse/KAFKA-15922
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]

2024-01-05 Thread via GitHub


vamossagar12 commented on PR #14666:
URL: https://github.com/apache/kafka/pull/14666#issuecomment-1878869619

   hey @divijvaidya , @showuon would you be able to review this PR? This seems 
like a useful change and things look fine once I reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]

2024-01-05 Thread via GitHub


ijuma commented on PR #15099:
URL: https://github.com/apache/kafka/pull/15099#issuecomment-1878866375

   That is not surprising - it's the reason why we limited parallelism in the 
first place. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on PR #15135:
URL: https://github.com/apache/kafka/pull/15135#issuecomment-1878840727

   IntelliJ will report a warning, provided that the object isn't passed to 
another method (as we do with `RocksDB.listColumnFamilies`).
   
   I can't get spotbugs to report this bug even if it's not passed to another 
method.
   
   I'm not even sure spotbugs is configured to report this problem, although I 
didn't see anything that would exclude it in `spotbugs-exclude.xml`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-05 Thread via GitHub


viktorsomogyi commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1442969642


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   I'm OK with that (refactoring later).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-05 Thread via GitHub


viktorsomogyi commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1442969642


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   I'm OK with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


lucasbru merged PR #15135:
URL: https://github.com/apache/kafka/pull/15135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-05 Thread via GitHub


lucasbru commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1878769287

   @philipnee some considerations:
- Does the old consumer guarantee that the interceptor is called when 
`poll` or `close` are not called by the consumer anymore after an autocommit? I 
don't think so, because it's running as part of the application thread, so how 
would it provide that guarantee without even being called?
- I think the interceptor is meant for monitoring / metrics -- it doesn't 
have the strict requirements as, for example, an ongoing commit shouldn't be 
dropped during close because it will result in data loss or duplicate 
processing.
   
   "Make the consumer interceptor thread-safe" -- How would we even do that? 
Consumer intereceptors are implemented by the user, we cannot make them thread 
safe by just putting a lock on it. The only way would be to write a KIP, have a 
breaking change and a migration guide, and ask all users to upgrade their code. 
Not sure that is worth it, given the original consumer does not seem to provide 
the guarantee that you are asking for.
   
   I think the best we can do is to insert the interceptor callback into the 
invoker queue upon commit completion, and make sure during close to execute all 
commit callbacks that are enqueued, no? If the user fails to call close, we are 
out of luck anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


lucasbru commented on PR #15135:
URL: https://github.com/apache/kafka/pull/15135#issuecomment-1878750810

   My hunch is that spotbugs won't report it when it's leaked from the method 
via another method call (here, `RocksDB.listColumnFamilies`). But yeah, better 
to check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.5.2 and 3.6.1 to system tests [kafka]

2024-01-05 Thread via GitHub


mimaison commented on PR #14932:
URL: https://github.com/apache/kafka/pull/14932#issuecomment-1878710831

   Thanks @mjsax!
   I've added 3.5.2 to this PR as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on PR #15135:
URL: https://github.com/apache/kafka/pull/15135#issuecomment-1878694769

   I agree @nicktelford, there could be false positives. As a followup to this 
PR, can you please create a JIRA to investigate why spotbugs didn't catch it 
for us and make necessary changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on code in PR #15112:
URL: https://github.com/apache/kafka/pull/15112#discussion_r1442911556


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4904,7 +4870,8 @@ public void suspend() {
 final Map> assignment = new 
HashMap<>(taskId00Assignment);
 assignment.putAll(taskId01Assignment);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(asList(task00, task01));
-replay(consumer);
+
+taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Is there a place where we are using old consumer with taskManager? If not, 
can we se this to new consumer in @before methods itself where we construct 
taskManager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803543#comment-17803543
 ] 

Matthias J. Sax commented on KAFKA-14404:
-

It seems [~sujayopensource] lost interest to work on this. So yes, please feel 
free to pick it up.

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]

2024-01-05 Thread via GitHub


mjsax commented on PR #14725:
URL: https://github.com/apache/kafka/pull/14725#issuecomment-1878653401

   Merged to `trunk` and cherry-picked to `3.7` and `3.6` branches. -- 
@wcarlson5 can you do a PR on `kafka-site` to get the change live for `3.6`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Add KIP-923 to upgrade-guide.html and dsl-api.html [kafka]

2024-01-05 Thread via GitHub


mjsax merged PR #14725:
URL: https://github.com/apache/kafka/pull/14725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 3.6.1 to system tests [kafka]

2024-01-05 Thread via GitHub


mjsax commented on PR #14932:
URL: https://github.com/apache/kafka/pull/14932#issuecomment-1878645077

   Uploaded 3.5.2 and 3.6.1 artifacts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-05 Thread via GitHub


clolov commented on code in PR #15112:
URL: https://github.com/apache/kafka/pull/15112#discussion_r1442836205


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4809,11 +4768,14 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 exception.corruptedTasks(),
 equalTo(Collections.singleton(taskId00))
 );
+
+Mockito.verify(mockitoConsumer, times(2)).groupMetadata();

Review Comment:
   I verified that if I change
   ```
   expect(consumer.groupMetadata()).andStubReturn(null);
   ```
   to
   ```
   expect(consumer.groupMetadata()).andReturn(null).times(2);
   ```
   the test still passes. I suspect it was always called twice, just never 
verified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15721: KRaft support in DeleteTopicsRequestWithDeletionDisabledTest [kafka]

2024-01-05 Thread via GitHub


wernerdv commented on PR #15124:
URL: https://github.com/apache/kafka/pull/15124#issuecomment-1878609362

   Hello, @jolshan
   Please, take a look.
   
   The changes are similar to #15086.
   
   Tests passed
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15124/4/testReport/kafka.server/DeleteTopicsRequestWithDeletionDisabledTest/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-05 Thread via GitHub


showuon commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1878605296

   > Okay, correct me if I am wrong - your idea is that we will use the 
aggregate object to either store partitions->values or topics->values. For the 
case where we don't have a topic tag we use the second, in all other cases we 
use the first. 
   
   Correct!
   
   > If this is correct, we need to add the 
brokerTopicStats.allTopicsStats()... to where we record each metric, no?
   
   Right, Like this: 
https://github.com/apache/kafka/pull/15133/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R792


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


nicktelford commented on PR #15135:
URL: https://github.com/apache/kafka/pull/15135#issuecomment-1878591135

   > Is there a way we could have caught this? (perhaps spotbugs telling that 
an autocloseable object is not getting closed)
   > 
   > see: 
https://spotbugs.readthedocs.io/en/latest/bugDescriptions.html#obl-method-may-fail-to-clean-up-stream-or-resource-obl-unsatisfied-obligation
   
   It does seem like this could have been caught by some tooling. Usually, the 
problem with AutoCloseables are when they're created/closed across a scope that 
covers multiple methods, because you can't use a try-with-resources.
   
   But in this case, it's allocated in a local scope, so static analysis should 
be able to determine that it's never closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16086: Fix memory leak in RocksDBStore [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on PR #15135:
URL: https://github.com/apache/kafka/pull/15135#issuecomment-1878582350

   Is there a way we could have caught this? (perhaps spotbugs telling that an 
autocloseable object is not getting closed)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: add all topic metrics [kafka]

2024-01-05 Thread via GitHub


showuon commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1878558194

   > Should we write a test that could have caught this?
   
   Will do! Just want to make sure the solution makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Migrate consumer mock in TaskManagerTest to Mockito [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on code in PR #15112:
URL: https://github.com/apache/kafka/pull/15112#discussion_r1442788932


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1354,14 +1346,12 @@ public void 
shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
 final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);

Review Comment:
   don't we want to do a `taskManager.setMainConsumer(mockitoConsumer);` here? 
(unless setUpTransitionToRunningOfRestoredTask is already doing so)



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4954,8 +4910,6 @@ public void shouldConvertStandbyTaskToActiveTask() {
 
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), 
Mockito.eq(taskId00Partitions), any()))
 .thenReturn(activeTask);
 
-replay(consumer);

Review Comment:
   I assume that consumer is never used in these tests where we have removed 
the replay?
   
   In that case, should we do something similar to what you did in 
shouldReturnFalseWhenThereAreStillNonRunningTasks , i.e. test for no 
interactions



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4809,11 +4768,14 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 exception.corruptedTasks(),
 equalTo(Collections.singleton(taskId00))
 );
+
+Mockito.verify(mockitoConsumer, times(2)).groupMetadata();

Review Comment:
   should it be called twice as per code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-05 Thread via GitHub


gaurav-narula commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-1878536740

   CC: @OmniaGM @pprovenzano 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-05 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803517#comment-17803517
 ] 

hzh0425 commented on KAFKA-16073:
-

[~ckamal] 
yes, Im working on it, and we can solve it together

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add public documentation for metrics introduced in KIP-963 [kafka]

2024-01-05 Thread via GitHub


showuon commented on PR #15131:
URL: https://github.com/apache/kafka/pull/15131#issuecomment-1878532279

   I found the bug that we didn't update the all topic metric value: 
[KAFKA-16085](https://issues.apache.org/jira/browse/KAFKA-16085) and the 
drafted solution: https://github.com/apache/kafka/pull/15133 . Please take a 
look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Address occasional UnnecessaryStubbingException in StreamThreadTest [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on PR #15134:
URL: https://github.com/apache/kafka/pull/15134#issuecomment-1878531012

   could you please help us understand in the description on what the cause of 
flakiness was and how adding lenient() fixes it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


stanislavkozlovski commented on PR #15130:
URL: https://github.com/apache/kafka/pull/15130#issuecomment-1878479652

   Some `ZkMigrationIntegrationTest` tests failed. Can we run a rebuild?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


stanislavkozlovski commented on code in PR #15130:
URL: https://github.com/apache/kafka/pull/15130#discussion_r1442734095


##
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java:
##
@@ -560,6 +560,38 @@ public void testConnectionSetupTimeout() {
 "Expected the connections to fail due to the socket connection 
setup timeout");
 }
 
+@Test
+public void testConnectionTimeoutAfterThrottling() {
+awaitReady(client, node);
+short requestVersion = PRODUCE.latestVersion();
+int timeoutMs = 1000;
+ProduceRequest.Builder builder = new ProduceRequest.Builder(
+requestVersion,
+requestVersion,
+new ProduceRequestData()
+.setAcks((short) 1)
+.setTimeoutMs(timeoutMs));
+TestCallbackHandler handler = new TestCallbackHandler();
+ClientRequest r1 = client.newClientRequest(node.idString(), builder, 
time.milliseconds(), true,
+defaultRequestTimeoutMs, handler);
+
+client.send(r1, time.milliseconds());
+client.poll(0, time.milliseconds());
+
+// Throttle long enough to ensure other inFlight requests timeout.
+ProduceResponse pr = new ProduceResponse(new 
ProduceResponseData().setThrottleTimeMs(timeoutMs));
+ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(pr, 
requestVersion, r1.correlationId());
+selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
+ClientRequest r2 = client.newClientRequest(node.idString(), builder, 
time.milliseconds(), true,
+defaultRequestTimeoutMs, handler);
+client.send(r2, time.milliseconds());
+time.sleep(timeoutMs);
+client.poll(0, time.milliseconds());
+
+assertEquals(1, client.inFlightRequestCount(node.idString()));
+assertFalse(client.connectionFailed(node), "Connection failed after 
throttling.");

Review Comment:
   ```suggestion
   assertFalse(client.connectionFailed(node), "Connection should not 
have failed due to the extra time spent throttling.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


stanislavkozlovski commented on code in PR #15130:
URL: https://github.com/apache/kafka/pull/15130#discussion_r1442732922


##
clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java:
##
@@ -158,7 +158,9 @@ public Iterable 
clearAll(String node) {
 
 private Boolean hasExpiredRequest(long now, 
Deque deque) {
 for (NetworkClient.InFlightRequest request : deque) {
-if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs)
+// We exclude throttle time here because we want to ensure that we 
don't expire requests while
+// they are throttled. The request timeout should take effect only 
after the throttle time has elapsed.
+if (request.timeElapsedSinceSendMs(now) - request.throttleTimeMs() 
> request.requestTimeoutMs)
 return true;

Review Comment:
   probably because we attach the timeoutMs to the request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]

2024-01-05 Thread via GitHub


stanislavkozlovski commented on code in PR #15130:
URL: https://github.com/apache/kafka/pull/15130#discussion_r1442732279


##
clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java:
##
@@ -158,7 +158,9 @@ public Iterable 
clearAll(String node) {
 
 private Boolean hasExpiredRequest(long now, 
Deque deque) {
 for (NetworkClient.InFlightRequest request : deque) {
-if (request.timeElapsedSinceSendMs(now) > request.requestTimeoutMs)
+// We exclude throttle time here because we want to ensure that we 
don't expire requests while
+// they are throttled. The request timeout should take effect only 
after the throttle time has elapsed.
+if (request.timeElapsedSinceSendMs(now) - request.throttleTimeMs() 
> request.requestTimeoutMs)
 return true;

Review Comment:
   one minor tricky thing here is that we never have a guarantee that we 
actually throttled fro `throttleTimeMs` precisely, because:
   - `time.milliseconds()` isn't monotonic
   - we use `time.milliseconds()` to compute the end time for throttling
   
   But this is equally inconsistent with the other two things here - 
`request.timeElapsedSinceSendMs(now)` and `request.requestTimeoutMs` - so I 
guess that's fine. Not sure why Kafka doesn't use hiResClockMs here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803506#comment-17803506
 ] 

Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:46 AM:
---

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation. I'm not able to accurately determine the exact size of the 
allocation, in practice it's likely to be considerably more than this.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.


was (Author: nicktelford):
As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803506#comment-17803506
 ] 

Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:45 AM:
---

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.


was (Author: nicktelford):
As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803506#comment-17803506
 ] 

Nicholas Telford commented on KAFKA-16086:
--

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-01-05 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16077:
---
Labels: streams  (was: )

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
>  Labels: streams
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> 

Re: [PR] MINOR: New year code cleanup - include final keyword [kafka]

2024-01-05 Thread via GitHub


mimaison commented on code in PR #15072:
URL: https://github.com/apache/kafka/pull/15072#discussion_r1442700296


##
clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java:
##
@@ -36,9 +36,9 @@ public final class LazyDownConversionRecordsSend extends 
RecordsSend> convertedRecordsIterator;
+private final Iterator> convertedRecordsIterator;

Review Comment:
   We can swap this line with the one above to group all final fields



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/StandardAuthorizerUpdateBenchmark.java:
##
@@ -59,17 +59,15 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class StandardAuthorizerUpdateBenchmark {
-@Param({"25000", "5", "75000", "10"})
-private int aclCount;
+private static final Random RANDOM = new 
Random(System.currentTimeMillis());
 private final String resourceNamePrefix = "foo-bar35_resource-";

Review Comment:
   Can we move this field down?



##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -64,11 +64,11 @@ public class MockConsumer implements Consumer {
 private final Map committed;
 private final Queue pollTasks;
 private final Set paused;
+private final AtomicBoolean wakeup;
 
-private Map>> records;
+private final Map>> records;

Review Comment:
   Let's add the empty line after this field to separate the final fields from 
the other ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-05 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803486#comment-17803486
 ] 

Kamal Chandraprakash commented on KAFKA-16073:
--

[~hzh0425@apache] 

Are you working on this issue? Shall I take it over?

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1, 3.8.0
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2024-01-05 Thread via GitHub


mimaison commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1442692990


##
core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.docker
+
+import kafka.tools.StorageTool
+import kafka.utils.Exit
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths, StandardCopyOption, StandardOpenOption}
+
+object KafkaDockerWrapper {
+  def main(args: Array[String]): Unit = {
+if (args.length == 0) {
+  throw new RuntimeException(s"Error: No operation input provided. " +
+s"Please provide a valid operation: 'setup'.")
+}
+val operation = args.head
+val arguments = args.tail
+
+operation match {
+  case "setup" =>
+if (arguments.length != 3) {
+  val errMsg = "not enough arguments passed. Usage: " +
+"setup  , 
"
+  System.err.println(errMsg)
+  Exit.exit(1, Some(errMsg))
+}
+val defaultConfigsDir = arguments(0)
+val mountedConfigsDir = arguments(1)
+val finalConfigsDir = arguments(2)
+try {
+  prepareConfigs(defaultConfigsDir, mountedConfigsDir, finalConfigsDir)
+} catch {
+  case e: Throwable =>
+val errMsg = s"error while preparing configs: ${e.getMessage}"
+System.err.println(errMsg)
+Exit.exit(1, Some(errMsg))
+}
+
+val formatCmd = formatStorageCmd(finalConfigsDir, envVars)
+StorageTool.main(formatCmd)
+  case _ =>
+throw new RuntimeException(s"Unknown operation $operation. " +
+  s"Please provide a valid operation: 'setup'.")
+}
+  }
+
+  import Constants._
+
+  private def formatStorageCmd(configsDir: String, env: Map[String, String]): 
Array[String] = {

Review Comment:
   `StorageTool` is being rewritten in Java in 
https://github.com/apache/kafka/pull/14847 so we can probably keep the code as 
is for now and see if we can tidy things up once the rewrite PR is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small cleanups in Connect [kafka]

2024-01-05 Thread via GitHub


mimaison commented on code in PR #15128:
URL: https://github.com/apache/kafka/pull/15128#discussion_r1442689284


##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -119,16 +117,16 @@ private boolean isInternalRequest(ContainerRequestContext 
requestContext) {
 
 public static class BasicAuthCallBackHandler implements CallbackHandler {
 
-private String username;
-private String password;
+private final String username;

Review Comment:
   Thanks, I'll take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small cleanups in Connect [kafka]

2024-01-05 Thread via GitHub


mimaison merged PR #15128:
URL: https://github.com/apache/kafka/pull/15128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16086:
--

Assignee: Nicholas Telford

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16086:
---
Affects Version/s: 3.7.0

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16086:
--

 Summary: Kafka Streams has RocksDB native memory leak
 Key: KAFKA-16086
 URL: https://issues.apache.org/jira/browse/KAFKA-16086
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Lucas Brutschy
 Attachments: image.png

The current 3.7 and trunk versions are leaking native memory while running 
Kafka streams over several hours. This will likely kill any real workload over 
time, so this should be treated as a blocker bug for 3.7.

This is discovered in a long-running soak test. Attached is the memory 
consumption, which steadily approaches 100% and then the JVM is killed.

Rerunning the same test with jemalloc native memory profiling, we see these 
allocated objects after a few hours:
 
{noformat}
(jeprof) top
Total: 13283138973 B
10296829713 77.5% 77.5% 10296829713 77.5% rocksdb::port::cacheline_aligned_alloc
2487325671 18.7% 96.2% 2487325671 18.7% rocksdb::BlockFetcher::ReadBlockContents
150937547 1.1% 97.4% 150937547 1.1% 
rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
47331433 0.4% 98.6% 105040933 0.8% rocksdb::BlockBasedTable::PutDataBlockToCache
32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
16032145 0.1% 99.4% 16032145 0.1% rocksdb::ColumnFamilyDescriptorJni::construct
12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
 

 

The first hypothesis is that this is caused by the leaking `Options` object 
introduced in this line:

 

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]

 

Introduced in this PR: 
[https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Address occasional UnnecessaryStubbingException in StreamThreadTest [kafka]

2024-01-05 Thread via GitHub


clolov commented on PR #15134:
URL: https://github.com/apache/kafka/pull/15134#issuecomment-1878383350

   This pull request should hopefully address the problem which was discussed 
in https://github.com/apache/kafka/pull/15116#issuecomment-1877052454


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Address occasional UnnecessaryStubbingException in StreamThreadTest [kafka]

2024-01-05 Thread via GitHub


clolov opened a new pull request, #15134:
URL: https://github.com/apache/kafka/pull/15134

   This pull request should address the occasional failures of the 
StreamThreadTest such as 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15116/1/tests
   
   A similar problem occurred 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2024-01-05 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) 
[https://github.com/apache/kafka/pull/13874, 
https://github.com/apache/kafka/pull/13897, 
https://github.com/apache/kafka/pull/13873|https://github.com/apache/kafka/pull/13874]
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#00875a}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo) [https://github.com/apache/kafka/pull/13932] 
 # {color:#00875a}StandbyTaskTest{color} (owner: Matthew) (takeover: Christo)
 # {color:#00875a}StoreChangelogReaderTest{color} (owner: Matthew) 
[https://github.com/apache/kafka/pull/12524]
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) (takeover: Christo) 
[https://github.com/apache/kafka/pull/14716]
 # {color:#00875a}StreamThreadTest{color} (owner: Matthew) (takeover: Christo) 
[https://github.com/apache/kafka/pull/13932] 
 # {color:#00875a}StreamsMetricsImplTest{color} (owner: Dalibor) (takeover: 
Christo)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owner: [~shekharrajak])
 # {color:#00875a}AbstractStreamTest{color} (owner: Christo)
 # {color:#00875a}KStreamTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KTableImplTest{color} (owner: Christo)
 # {color:#00875a}KTableTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}SessionCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedCacheFlushListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedTupleForwarderTest{color} (owner: Christo)
 # 

Re: [PR] KAFKA-16077: Streams with state updater fails to close task upon fencing [kafka]

2024-01-05 Thread via GitHub


lucasbru commented on PR #15117:
URL: https://github.com/apache/kafka/pull/15117#issuecomment-1878354099

   > @lucasbru What do you think about my previous question?
   > 
   > > Should we also add a integration test that tests the scenario?
   
   We don't test the other cases of the pending actions, but I agree that it 
would be good to test this. I thought about it for a bit and I think it will 
require quite some tooling to get it tested in an integration test (maybe I'm 
missing something though).
   
   Since we have covered it by unit tests, and this is a blocker issue for 3.7, 
I'd vote for merging the fix and creating a follow-up ticket for the 
integration test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-05 Thread via GitHub


showuon commented on code in PR #15132:
URL: https://github.com/apache/kafka/pull/15132#discussion_r1442645316


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4075,6 +4122,7 @@ class KafkaApisTest extends Logging {
 assertEquals(authorizedTopic, metadataResponseTopic.name())
   }
 }
+kafkaApis.close()

Review Comment:
   No, this one is special. If the 1st one throws exception before it closes, 
we'll close it in the `AfterEach`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16060) Some questions about tiered storage capabilities

2024-01-05 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803470#comment-17803470
 ] 

Satish Duggana commented on KAFKA-16060:


[~jianbin] There are no short term plans to support JBOD with tiered storage.

> Some questions about tiered storage capabilities
> 
>
> Key: KAFKA-16060
> URL: https://issues.apache.org/jira/browse/KAFKA-16060
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Jianbin Chen
>Priority: Major
>
> # If a topic has 3 replicas, when the local expiration time is reached, will 
> all 3 replicas trigger the log transfer to the remote storage, or will only 
> the leader in the isr transfer the log to the remote storage (hdfs, s3)
>  # Topics that do not support compression, do you mean topics that 
> log.cleanup.policy=compact?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16059: close more kafkaApis instances [kafka]

2024-01-05 Thread via GitHub


divijvaidya commented on code in PR #15132:
URL: https://github.com/apache/kafka/pull/15132#discussion_r1442639976


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4075,6 +4122,7 @@ class KafkaApisTest extends Logging {
 assertEquals(authorizedTopic, metadataResponseTopic.name())
   }
 }
+kafkaApis.close()

Review Comment:
   should this be in a finally as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: add all topic metrics [kafka]

2024-01-05 Thread via GitHub


showuon commented on PR #15133:
URL: https://github.com/apache/kafka/pull/15133#issuecomment-1878302828

   @clolov @satishd @kamalcph , do you think this fix makes sense? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: add all topic metrics [kafka]

2024-01-05 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1442611638


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -529,18 +534,28 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   }
 }
 
-class BrokerTopicAggregatedMetric() {
+class BrokerTopicAggregatedMetric(allTopics: Boolean = false) {
   private val partitionMetricValues = new ConcurrentHashMap[Int, Long]()
+  private val topicMetricValues = new ConcurrentHashMap[String, Long]()

Review Comment:
   add a `topicMetricValues` for allTopics case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16085: add all topic metrics [kafka]

2024-01-05 Thread via GitHub


showuon opened a new pull request, #15133:
URL: https://github.com/apache/kafka/pull/15133

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16085) remote copy lag bytes/segments metrics don't update all topic value

2024-01-05 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803452#comment-17803452
 ] 

Luke Chen commented on KAFKA-16085:
---

[~christo_lolov] [~satishd] [~ckamal], this is the part we missed. FYI



> remote copy lag bytes/segments metrics don't update all topic value
> ---
>
> Key: KAFKA-16085
> URL: https://issues.apache.org/jira/browse/KAFKA-16085
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>
> The metrics added in 
> [KIP-963|https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage#KIP963:AdditionalmetricsinTieredStorage-Copy]
>  is BrokerTopicMetrics, which means it should provide per-topic metric value 
> and all topics metric value. But current implementation doesn't update all 
> topic metric value.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16085) remote copy lag bytes/segments metrics don't update all topic value

2024-01-05 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16085:
-

 Summary: remote copy lag bytes/segments metrics don't update all 
topic value
 Key: KAFKA-16085
 URL: https://issues.apache.org/jira/browse/KAFKA-16085
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen


The metrics added in 
[KIP-963|https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage#KIP963:AdditionalmetricsinTieredStorage-Copy]
 is BrokerTopicMetrics, which means it should provide per-topic metric value 
and all topics metric value. But current implementation doesn't update all 
topic metric value.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16060) Some questions about tiered storage capabilities

2024-01-05 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803449#comment-17803449
 ] 

Jianbin Chen commented on KAFKA-16060:
--

Thank you both for your replies, and allow me to ask an additional question for 
jbod disk mount access, i.e. log.dirs=/data01,/data02

Will there be any future plans to support tiered storage in this manner?

> Some questions about tiered storage capabilities
> 
>
> Key: KAFKA-16060
> URL: https://issues.apache.org/jira/browse/KAFKA-16060
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Jianbin Chen
>Priority: Major
>
> # If a topic has 3 replicas, when the local expiration time is reached, will 
> all 3 replicas trigger the log transfer to the remote storage, or will only 
> the leader in the isr transfer the log to the remote storage (hdfs, s3)
>  # Topics that do not support compression, do you mean topics that 
> log.cleanup.policy=compact?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)