[GitHub] [kafka] iblislin commented on pull request #12371: KAFKA-14035: Fix NPE caused by missing null check in SnapshottableHashTable::mergeFrom()

2023-01-03 Thread GitBox


iblislin commented on PR #12371:
URL: https://github.com/apache/kafka/pull/12371#issuecomment-1370579167

   There are related discussions on StackOverflow: 
https://stackoverflow.com/questions/74679400.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] iblislin commented on pull request #12371: KAFKA-14035: Fix NPE caused by missing null check in SnapshottableHashTable::mergeFrom()

2023-01-03 Thread GitBox


iblislin commented on PR #12371:
URL: https://github.com/apache/kafka/pull/12371#issuecomment-1370578419

   Hi,
   
   I still ran into the issue of NPE on a 3.3.1 cluster with 3 nodes.
   
   The actual line throwing the NPE is here: 
https://github.com/apache/kafka/blob/3.3.1/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java#L125
   
   But I cannot speak in Java, no idea about the root cause.
   
   The traceback:
   ```
   kafka-k3-1  | [2023-01-04 06:23:20,276] ERROR Encountered fatal fault: 
exception while renouncing leadership 
(org.apache.kafka.server.fault.ProcessExitingFaultHandler)
   kafka-k3-1  | java.lang.NullPointerException
   kafka-k3-1  |   at 
org.apache.kafka.timeline.SnapshottableHashTable$HashTier.mergeFrom(SnapshottableHashTable.java:125)
   kafka-k3-1  |   at 
org.apache.kafka.timeline.Snapshot.mergeFrom(Snapshot.java:68)
   kafka-k3-1  |   at 
org.apache.kafka.timeline.SnapshotRegistry.deleteSnapshot(SnapshotRegistry.java:236)
   kafka-k3-1  |   at 
org.apache.kafka.timeline.SnapshotRegistry$SnapshotIterator.remove(SnapshotRegistry.java:67)
   kafka-k3-1  |   at 
org.apache.kafka.timeline.SnapshotRegistry.revertToSnapshot(SnapshotRegistry.java:214)
   kafka-k3-1  |   at 
org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1232)
   kafka-k3-1  |   at 
org.apache.kafka.controller.QuorumController.access$3300(QuorumController.java:150)
   kafka-k3-1  |   at 
org.apache.kafka.controller.QuorumController$QuorumMetaLogListener.lambda$handleLeaderChange$3(QuorumController.java:1076)
   kafka-k3-1  |   at 
org.apache.kafka.controller.QuorumController$QuorumMetaLogListener.lambda$appendRaftEvent$4(QuorumController.java:1101)
   kafka-k3-1  |   at 
org.apache.kafka.controller.QuorumController$ControlEvent.run(QuorumController.java:496)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2023-01-03 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14567:
---

 Summary: Kafka Streams crashes after ProducerFencedException
 Key: KAFKA-14567
 URL: https://issues.apache.org/jira/browse/KAFKA-14567
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Running a Kafka Streams application with EOS-v2.

After a thread crashed, we re-spanned a new thread what implies that the 
thread-index number was re-used, resulting in an `transactional.id` reuse, that 
lead to a `ProducerFencedException`.

After the fencing, the fenced thread crashed resulting in a non-recoverable 
error:
{quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
task 1_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_2, processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
partition=2, offset=539776276, stacktrace=java.lang.IllegalStateException: 
TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: 
Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamTh

[jira] [Updated] (KAFKA-14566) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces

2023-01-03 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14566:

Description: 
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));{code}
 

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is 
never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below this.interceptors 
is never created. 

 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
 {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 

 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
 

their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was 
never created.

 
{code:java}
private void close(long timeoutMs, boolean swallowException) {
   
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 {code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 

[jira] [Created] (KAFKA-14566) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces

2023-01-03 Thread Terry Beard (Jira)
Terry Beard created KAFKA-14566:
---

 Summary: Add A No Implementation Default Open Method To Consumer 
and Producer Interceptor Interfaces
 Key: KAFKA-14566
 URL: https://issues.apache.org/jira/browse/KAFKA-14566
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Terry Beard


h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));{code}
 

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
{code}
If the above line results in a runtime exception, the below 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is 
never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
 {code}
Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was 
never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
   
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 {code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clea

[jira] [Created] (KAFKA-14565) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces

2023-01-03 Thread Terry Beard (Jira)
Terry Beard created KAFKA-14565:
---

 Summary: Add A No Implementation Default Open Method To Consumer 
and Producer Interceptor Interfaces
 Key: KAFKA-14565
 URL: https://issues.apache.org/jira/browse/KAFKA-14565
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Terry Beard
Assignee: Terry Beard


h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. [More 
colors|https://issues.apache.org/jira/secure/CreateIssue.jspa#]

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.
h2. Kafka Consumer Constructor

 
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
 

 
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 
h2. KafkaConsumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
 {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
...
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{color:#172b4d}*{color:#ffab00}this{color}.{color:#403294}interceptors{color}*{color}{color:#403294}
 {color}was never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
   
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
   {code}
This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.   
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads an

[GitHub] [kafka] abscondment opened a new pull request, #13070: KAFKA-14564: Upgrade netty to 4.1.86 to address CVEs

2023-01-03 Thread GitBox


abscondment opened a new pull request, #13070:
URL: https://github.com/apache/kafka/pull/13070

   For [KAFKA-14564](https://issues.apache.org/jira/browse/KAFKA-14564): 
upgrade to Netty 4.1.86
   
   Fixes the following:
   
   * [CVE-2022-41881](https://nvd.nist.gov/vuln/detail/CVE-2022-41881)
   * [CVE-2022-41915](https://nvd.nist.gov/vuln/detail/CVE-2022-41915)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-01-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654211#comment-17654211
 ] 

Matthias J. Sax commented on KAFKA-13295:
-

Thanks. SG.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.5.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14564) Upgrade Netty to 4.1.86.Final to fix CVEs

2023-01-03 Thread Brendan Ribera (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brendan Ribera updated KAFKA-14564:
---
Summary: Upgrade Netty to 4.1.86.Final to fix CVEs  (was: Upgrade Netty to 
4.1.86 to fix CVEs)

> Upgrade Netty to 4.1.86.Final to fix CVEs
> -
>
> Key: KAFKA-14564
> URL: https://issues.apache.org/jira/browse/KAFKA-14564
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.1
>Reporter: Brendan Ribera
>Priority: Major
>
> 4.1.86 fixes two CVEs:
>  * [https://nvd.nist.gov/vuln/detail/CVE-2022-41881]
>  * [https://nvd.nist.gov/vuln/detail/CVE-2022-41915]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14564) Upgrade Netty to 4.1.86 to fix CVEs

2023-01-03 Thread Brendan Ribera (Jira)
Brendan Ribera created KAFKA-14564:
--

 Summary: Upgrade Netty to 4.1.86 to fix CVEs
 Key: KAFKA-14564
 URL: https://issues.apache.org/jira/browse/KAFKA-14564
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.1
Reporter: Brendan Ribera


4.1.86 fixes two CVEs:
 * [https://nvd.nist.gov/vuln/detail/CVE-2022-41881]
 * [https://nvd.nist.gov/vuln/detail/CVE-2022-41915]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


C0urante commented on PR #13052:
URL: https://github.com/apache/kafka/pull/13052#issuecomment-1370166878

   You can just request that the fix be backported on the PR itself. It also 
helps (but doesn't guarantee a backport) if you populate the `Affects 
Version/s` field in Jira so that we know which branches contain the bug.
   
   In this case, because we have in-flight releases for 3.3 and 3.4, one other 
option we have is to mark the Jira ticket as a blocker, and include 3.3.3 and 
3.4.1 in the `Fix Version/s` field so that we know it has to be backported 
before we can put out releases for those versions. It's a bit extreme to label 
this a blocker but this approach does help us track the need to backport the 
fix. I'll let @mimaison choose how to handle this, though, since he's the one 
reviewing (and probably merging) this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 closed pull request #13035: [WIP] KAFKA-9087 The changed future log causes that ReplicaAlterLogDirsThre…

2023-01-03 Thread GitBox


chia7712 closed pull request #13035: [WIP] KAFKA-9087 The changed future log 
causes that ReplicaAlterLogDirsThre…
URL: https://github.com/apache/kafka/pull/13035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] csolidum commented on pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


csolidum commented on PR #13052:
URL: https://github.com/apache/kafka/pull/13052#issuecomment-1370118613

   @C0urante thanks for the thorough reply. This isn't happening too frequently 
so i'm fine waiting for the 3.4.1 release. Is the right way to make sure this 
change is include to email the devs@ mailing list, or is there another process 
for that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2023-01-03 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10550.

Resolution: Fixed

I think the scope of the kip – describe and delete has been completed so I will 
mark this as resolved for now.

> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Deng Ziming
>Priority: Major
>
> Change describe topics AdminClient method to expose and support topic IDs 
>  
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14563) Remove AddPartitionsToTxn call for newer clients as optimization

2023-01-03 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14563:
--

 Summary: Remove AddPartitionsToTxn call for newer clients as 
optimization
 Key: KAFKA-14563
 URL: https://issues.apache.org/jira/browse/KAFKA-14563
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


This is part 2 of KIP-890:

{*}2. Remove the addPartitionsToTxn call and implicitly just add partitions to 
the transaction on the first produce request during a transaction{*}.

See KIP-890 for more information: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-01-03 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-14561:
--

Assignee: Justine Olshan

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14562) Implement epoch bump after every transaction

2023-01-03 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14562:
--

 Summary: Implement epoch bump after every transaction
 Key: KAFKA-14562
 URL: https://issues.apache.org/jira/browse/KAFKA-14562
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


This is part 1 of KIP-890


 # *Uniquely identify transactions by bumping the producer epoch after every 
commit/abort marker. That way, each transaction can be identified by (producer 
id, epoch).* 



See KIP-890 for more information: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-01-03 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14561:
--

 Summary: Improve transactions experience for older clients by 
ensuring ongoing transaction
 Key: KAFKA-14561
 URL: https://issues.apache.org/jira/browse/KAFKA-14561
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


This is part 3 of KIP-890:

3. *To cover older clients, we will ensure a transaction is ongoing before we 
write to a transaction. We can do this by querying the transaction coordinator 
and caching the result.*

See KIP-890 for more details: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14402) Transactions Server Side Defense

2023-01-03 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14402:
---
Issue Type: Improvement  (was: Task)

> Transactions Server Side Defense
> 
>
> Key: KAFKA-14402
> URL: https://issues.apache.org/jira/browse/KAFKA-14402
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> We have seen hanging transactions in Kafka where the last stable offset (LSO) 
> does not update, we can’t clean the log (if the topic is compacted), and 
> read_committed consumers get stuck.
> This can happen when a message gets stuck or delayed due to networking issues 
> or a network partition, the transaction aborts, and then the delayed message 
> finally comes in. The delayed message case can also violate EOS if the 
> delayed message comes in after the next addPartitionsToTxn request comes in. 
> Effectively we may see a message from a previous (aborted) transaction become 
> part of the next transaction.
> Another way hanging transactions can occur is that a client is buggy and may 
> somehow try to write to a partition before it adds the partition to the 
> transaction. In both of these cases, we want the server to have some control 
> to prevent these incorrect records from being written and either causing 
> hanging transactions or violating Exactly once semantics (EOS) by including 
> records in the wrong transaction.
> The best way to avoid this issue is to:
>  # *Uniquely identify transactions by bumping the producer epoch after every 
> commit/abort marker. That way, each transaction can be identified by 
> (producer id, epoch).* 
>  # {*}Remove the addPartitionsToTxn call and implicitly just add partitions 
> to the transaction on the first produce request during a transaction{*}.
> We avoid the late arrival case because the transaction is uniquely identified 
> and fenced AND we avoid the buggy client case because we remove the need for 
> the client to explicitly add partitions to begin the transaction.
> Of course, 1 and 2 require client-side changes, so for older clients, those 
> approaches won’t apply.
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
>  
> See KIP-890 for more information: ** 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


C0urante commented on PR #13052:
URL: https://github.com/apache/kafka/pull/13052#issuecomment-1370104970

   That's the best summary I can provide right now; @mimaison please keep me 
honest if anything is misleading or incorrect 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


C0urante commented on PR #13052:
URL: https://github.com/apache/kafka/pull/13052#issuecomment-1370104415

   @csolidum We can backport bug fixes that are low-risk and non-invasive (such 
as this one) to older branches so that they'll appear in subsequent releases. 
However, for active releases it's a bit harder to get things in.
   
   For .0 releases (e.g., 3.4.0), if we're past the code freeze deadline (which 
we are for 3.4.0), then the only fixes we tend to allow on the branch are for 
either newly-introduced regressions or very serious recently-unearthed issues.
   
   For bug fix releases (e.g., 3.3.2), we don't usually have code freeze 
deadlines, but we do identify blocker issues that have to be addressed before 
release candidates can be generated. If all of those issues have been addressed 
and a release candidate has been generated (which is currently the case for 
3.3.2), then unless a blocker issue (which is either a serious 
recently-unearthed issue, or a newly-introduced regression) is raised, the 
release will proceed without generating a new candidate.
   
   In this particular case, the bug here doesn't appear to be a regression, and 
IMO it's not serious enough to qualify as a blocker for the 3.3.2 or 3.4.0 
releases. What can probably happen here is that the fix gets backported to the 
3.3 and 3.4 branches, but only after the in-flight 3.3.2 and 3.4.0 releases are 
pushed out. Then, the fix will be present in versions 3.3.3 (if one is 
released) and 3.4.1.
   
   If that's unsatisfactory, you can reach out on the 3.3.2 and 3.4.0 
discussion threads on the dev mailing list and make a case to treat this issue 
as a blocker by including details on the impact it's having on you/your 
environment, whether it's a regression (and if not, how old it is), and how 
much risk it presents.
   
   Full disclosure: I'm the release manager for 3.3.2 and wouldn't mind seeing 
this added to 3.3.2 (even at the cost of generating a new release candidate), 
but only if we can also get it included in 3.4.0, to prevent problems from 
arising should users choose to upgrade from 3.3.2 to 3.4.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-01-03 Thread GitBox


ivanyu commented on PR #13067:
URL: https://github.com/apache/kafka/pull/13067#issuecomment-1370097055

   There are four places where `KafkaMetricsGroup` methods are overridden, it's 
always `metricName`. Sometimes it comes with some logic like in `UnifiedLog`. 
We of course can change this in various ways, but probably it's a good idea to 
keep it as is for the initial refactoring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2023-01-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654133#comment-17654133
 ] 

Chia-Ping Tsai commented on KAFKA-9087:
---

[~junrao] Sorry for late response.
{quote}So, ReplicaAlterLogDirsThread is supposed to ignore the old fetched data 
and fetch again using the new fetch offset. I am wondering why that didn't 
happen.
{quote}
You are right. The true root cause is shown below.
 # tp-0 is located at broker-0:/tmp/data0
 # move tp-0 from /tmp/data0 to /tmp/data1. It will create a new future log 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L765])
 and ReplicaAlterLogDirsThread. The new future log does not have leader epoch 
before it sync data
 # file a partition reassignment to trigger LeaderAndIsrRequest request. The 
request will update the partition state of ReplicaAlterLogDirsThread 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1565]),
 and the new offset of partition state is set with highWatermark of log
 # ReplicaAlterLogDirsThread uses the high watermark instead of 
OffsetsForLeaderEpoch API if there is no epoch cache.
 # The future log is new, so its end offset is 0. And the offset mismatch ( 0 
v.s high watermark of log) causes the error.

In short, the race condition of processing LeaderAndIsrRequest and 
AlterReplicaLogDirsRequest causes this error (on V2 message format). Also, the 
error can be reproduced easily on V1 since there is no epoch cache. I’m not 
sure why it used log.highWatermark 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1559]).
 The ReplicaAlterLogDirsThread checks the offset of “future log” rather than 
“log. Hence, here is my two cents, we can replace log.highWatermark by 
futureLog.highWatermark to resolve this issue. I tested it on our cluster and 
it works well (on both V1 and V2).

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has bee

[jira] [Comment Edited] (KAFKA-14453) Flaky test suite MirrorConnectorsWithCustomForwardingAdminIntegrationTest

2023-01-03 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654131#comment-17654131
 ] 

Chris Egerton edited comment on KAFKA-14453 at 1/3/23 6:22 PM:
---

[~mjsax] Sorry, I've got a lot on my plate over the next couple weeks :( If 
this is still an issue by mid-January I might be able to take a look then.

 

I know [~gharris1727] has been doing some work on tackling flaky tests; perhaps 
he might be able to lend a hand?


was (Author: chrisegerton):
[~mjsax] I've got a lot on my plate over the next couple weeks–if this is still 
an issue by mid-January I might be able to take a look then.

 

I know [~gharris1727] has been doing some work on tackling flaky tests; perhaps 
he might be able to lend a hand?

> Flaky test suite MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> -
>
> Key: KAFKA-14453
> URL: https://issues.apache.org/jira/browse/KAFKA-14453
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Chris Egerton
>Priority: Major
>  Labels: flaky-test
>
> We've been seeing some integration test failures lately for the 
> {{MirrorConnectorsWithCustomForwardingAdminIntegrationTest}} test suite. A 
> couple examples:
> {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
> 6. Topic: mm2-offset-syncs.backup.internal didn't get created in the 
> FakeLocalMetadataStore ==> expected:  but was: }}
> {{    at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)}}
> {{    at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)}}
> {{    at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)}}
> {{    at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)}}
> {{    at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)}}
> {{    at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:326)}}
> {{    at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:217)}}
> {{}}
>  
> And:
>  
> {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
> 6. Topic: primary.test-topic-1's configs don't have partitions:11 ==> 
> expected:  but was: }}
> {{    }}{{at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)}}
> {{    }}{{at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)}}
> {{    }}{{at 
> org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)}}
> {{    }}{{at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)}}
> {{    }}{{at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)}}
> {{    }}{{at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicConfigPersistInFakeLocalMetaDataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:334)}}
> {{    }}{{at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:255)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14453) Flaky test suite MirrorConnectorsWithCustomForwardingAdminIntegrationTest

2023-01-03 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654131#comment-17654131
 ] 

Chris Egerton commented on KAFKA-14453:
---

[~mjsax] I've got a lot on my plate over the next couple weeks–if this is still 
an issue by mid-January I might be able to take a look then.

 

I know [~gharris1727] has been doing some work on tackling flaky tests; perhaps 
he might be able to lend a hand?

> Flaky test suite MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> -
>
> Key: KAFKA-14453
> URL: https://issues.apache.org/jira/browse/KAFKA-14453
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Chris Egerton
>Priority: Major
>  Labels: flaky-test
>
> We've been seeing some integration test failures lately for the 
> {{MirrorConnectorsWithCustomForwardingAdminIntegrationTest}} test suite. A 
> couple examples:
> {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
> 6. Topic: mm2-offset-syncs.backup.internal didn't get created in the 
> FakeLocalMetadataStore ==> expected:  but was: }}
> {{    at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)}}
> {{    at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)}}
> {{    at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)}}
> {{    at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)}}
> {{    at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)}}
> {{    at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)}}
> {{    at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicToPersistInFakeLocalMetadataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:326)}}
> {{    at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:217)}}
> {{}}
>  
> And:
>  
> {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
> 6. Topic: primary.test-topic-1's configs don't have partitions:11 ==> 
> expected:  but was: }}
> {{    }}{{at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)}}
> {{    }}{{at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)}}
> {{    }}{{at 
> org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)}}
> {{    }}{{at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)}}
> {{    }}{{at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)}}
> {{    }}{{at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:308)}}
> {{    }}{{at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.waitForTopicConfigPersistInFakeLocalMetaDataStore(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:334)}}
> {{    }}{{at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin(MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:255)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] csolidum commented on pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


csolidum commented on PR #13052:
URL: https://github.com/apache/kafka/pull/13052#issuecomment-1370085587

   @mimaison @gharris1727 @C0urante 
   Still getting a sense of what the kafka release process is like for bug 
fixes. Is there a way to request this patch is included into the next 3.3.X 
release or the 3.4 release if it's about to be cut?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2023-01-03 Thread GitBox


C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1060830766


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -780,6 +774,14 @@ protected void stopServices() {
 }
 }
 
+// Timeout for herderExecutor to gracefully terminate is set to a value to 
accommodate
+// reading to the end of the config topic + successfully attempting to 
stop all connectors and tasks and a buffer of 10s
+private long getHerderExecutorTimeoutMs() {

Review Comment:
   Nit: We don't use `get` in this code base
   
   ```suggestion
   private long herderExecutorTimeoutMs() {
   ```
   
   Also, is there a reason this has to be a separate method at all? It's only 
called in one place right now and the logic isn't really complicated enough to 
warrant isolated unit testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] csolidum commented on a diff in pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


csolidum commented on code in PR #13052:
URL: https://github.com/apache/kafka/pull/13052#discussion_r1060839550


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -189,14 +189,17 @@ private Map 
listConsumerGroupOffsets(String g
 
 Optional checkpoint(String group, TopicPartition 
topicPartition,
 OffsetAndMetadata offsetAndMetadata) {
-long upstreamOffset = offsetAndMetadata.offset();
-OptionalLong downstreamOffset = 
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-if (downstreamOffset.isPresent()) {
-return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
+if (offsetAndMetadata != null) {
+long upstreamOffset = offsetAndMetadata.offset();
+OptionalLong downstreamOffset =
+offsetSyncStore.translateDownstream(topicPartition, 
upstreamOffset);
+if (downstreamOffset.isPresent()) {
+return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
 upstreamOffset, downstreamOffset.getAsLong(), 
offsetAndMetadata.metadata()));
-} else {
-return Optional.empty();
+}
 }
+return Optional.empty();
+

Review Comment:
   @mimaison Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13068: KAFKA-13999: Add ProducerIdCount metric

2023-01-03 Thread GitBox


jolshan commented on PR #13068:
URL: https://github.com/apache/kafka/pull/13068#issuecomment-1370083728

   Ah I believe had a slightly different idea for implementation where the size 
was not evaluated on every call, but rather a variable was updated every time 
we added or removed producer IDs. Let me see if he can share his version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-01-03 Thread GitBox


fvaleri commented on PR #13067:
URL: https://github.com/apache/kafka/pull/13067#issuecomment-1370083367

   Hi @ivanyu, thanks for looking into this.
   
   >Currently, classes-users of the KafkaMetricsGroup trait sometimes override 
its methods to tune its behavior (mostly, names).
   
   If that's the case, why not directly use `explicitMetricName`?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-03 Thread GitBox


ijuma commented on PR #13049:
URL: https://github.com/apache/kafka/pull/13049#issuecomment-1370081626

   @mimaison Thanks for the review. I addressed or replied to each comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-03 Thread GitBox


ijuma commented on code in PR #13049:
URL: https://github.com/apache/kafka/pull/13049#discussion_r1060836979


##
storage/src/main/java/org/apache/kafka/server/log/internals/LogConfig.java:
##
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidList;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ConfigUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.server.record.BrokerCompressionType;
+
+public class LogConfig extends AbstractConfig {
+
+public static class MessageFormatVersion {
+private final String messageFormatVersionString;
+private final String interBrokerProtocolVersionString;
+private final MetadataVersion messageFormatVersion;
+private final MetadataVersion interBrokerProtocolVersion;
+
+public MessageFormatVersion(String messageFormatVersionString, String 
interBrokerProtocolVersionString) {
+this.messageFormatVersionString = messageFormatVersionString;
+this.interBrokerProtocolVersionString = 
interBrokerProtocolVersionString;
+this.messageFormatVersion = 
MetadataVersion.fromVersionString(messageFormatVersionString);
+this.interBrokerProtocolVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString);
+}
+
+public MetadataVersion messageFormatVersion() {
+return messageFormatVersion;
+}
+
+public MetadataVersion interBrokerProtocolVersion() {
+return interBrokerProtocolVersion;
+}
+
+public boolean shouldIgnore() {
+return 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion);
+}
+
+public boolean shouldWarn() {
+return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)
+&& 
messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2);
+}
+
+@SuppressWarnings("deprecation")
+public String topicWarningMessage(String topicName) {
+return "Topic configuration " + 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `"
++ messageFormatVer

[GitHub] [kafka] ijuma commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-03 Thread GitBox


ijuma commented on code in PR #13049:
URL: https://github.com/apache/kafka/pull/13049#discussion_r1060835881


##
storage/src/main/java/org/apache/kafka/server/log/internals/LogConfig.java:
##
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidList;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ConfigUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.server.record.BrokerCompressionType;
+
+public class LogConfig extends AbstractConfig {
+
+public static class MessageFormatVersion {
+private final String messageFormatVersionString;
+private final String interBrokerProtocolVersionString;
+private final MetadataVersion messageFormatVersion;
+private final MetadataVersion interBrokerProtocolVersion;
+
+public MessageFormatVersion(String messageFormatVersionString, String 
interBrokerProtocolVersionString) {
+this.messageFormatVersionString = messageFormatVersionString;
+this.interBrokerProtocolVersionString = 
interBrokerProtocolVersionString;
+this.messageFormatVersion = 
MetadataVersion.fromVersionString(messageFormatVersionString);
+this.interBrokerProtocolVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString);
+}
+
+public MetadataVersion messageFormatVersion() {
+return messageFormatVersion;
+}
+
+public MetadataVersion interBrokerProtocolVersion() {
+return interBrokerProtocolVersion;
+}
+
+public boolean shouldIgnore() {
+return 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion);
+}
+
+public boolean shouldWarn() {
+return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)
+&& 
messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2);
+}
+
+@SuppressWarnings("deprecation")
+public String topicWarningMessage(String topicName) {
+return "Topic configuration " + 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `"
++ messageFormatVer

[GitHub] [kafka] ijuma commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-03 Thread GitBox


ijuma commented on code in PR #13049:
URL: https://github.com/apache/kafka/pull/13049#discussion_r1060835881


##
storage/src/main/java/org/apache/kafka/server/log/internals/LogConfig.java:
##
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidList;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ConfigUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.server.record.BrokerCompressionType;
+
+public class LogConfig extends AbstractConfig {
+
+public static class MessageFormatVersion {
+private final String messageFormatVersionString;
+private final String interBrokerProtocolVersionString;
+private final MetadataVersion messageFormatVersion;
+private final MetadataVersion interBrokerProtocolVersion;
+
+public MessageFormatVersion(String messageFormatVersionString, String 
interBrokerProtocolVersionString) {
+this.messageFormatVersionString = messageFormatVersionString;
+this.interBrokerProtocolVersionString = 
interBrokerProtocolVersionString;
+this.messageFormatVersion = 
MetadataVersion.fromVersionString(messageFormatVersionString);
+this.interBrokerProtocolVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString);
+}
+
+public MetadataVersion messageFormatVersion() {
+return messageFormatVersion;
+}
+
+public MetadataVersion interBrokerProtocolVersion() {
+return interBrokerProtocolVersion;
+}
+
+public boolean shouldIgnore() {
+return 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion);
+}
+
+public boolean shouldWarn() {
+return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)
+&& 
messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2);
+}
+
+@SuppressWarnings("deprecation")
+public String topicWarningMessage(String topicName) {
+return "Topic configuration " + 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `"
++ messageFormatVer

[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…

2023-01-03 Thread GitBox


C0urante commented on code in PR #11818:
URL: https://github.com/apache/kafka/pull/11818#discussion_r1060825828


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -294,5 +302,9 @@ boolean update(long upstreamOffset, long downstreamOffset) {
 previousDownstreamOffset = downstreamOffset;
 return shouldSyncOffsets;
 }
+
+void reset() {
+shouldSyncOffsets = false;

Review Comment:
   I was wondering if we should also move the updates for the 
`lastSyncUpstreamOffset` and `lastSyncDownstreamOffset` fields to this method, 
but there doesn't appear to be any downsides to this approach given our current 
usages of `update` and `reset`. 👍 



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -69,14 +69,19 @@ public MirrorSourceTask() {}
 
 // for testing
 MirrorSourceTask(KafkaConsumer consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
- ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer producer) {
+ ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer producer,
+ Semaphore outstandingOffsetSyncs, Map partitionStates,

Review Comment:
   I'm not sure we should be mocking the semaphore here, since it's a bit of an 
implementation detail and I've already proposed that we split it out into 
several different semaphores (one for each topic partition) in 
https://github.com/apache/kafka/pull/12366.
   
   Can we get the same testing coverage/guarantees by mocking out the offset 
sync producer instead, and selectively invoking/not invoking the callbacks 
passed to it in `send`?



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -81,15 +88,25 @@ public void testOffsetSync() {
 MirrorSourceTask.PartitionState partitionState = new 
MirrorSourceTask.PartitionState(50);
 
 assertTrue(partitionState.update(0, 100), "always emit offset sync on 
first update");
+partitionState.reset();

Review Comment:
   Is it worth adding coverage to this case (and possibly `testZeroOffsetSync`) 
for when we invoke `update` repeatedly without invoking `reset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-03 Thread GitBox


mimaison commented on code in PR #13049:
URL: https://github.com/apache/kafka/pull/13049#discussion_r1060760759


##
storage/src/main/java/org/apache/kafka/server/log/internals/LogConfig.java:
##
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidList;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.ConfigUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+import org.apache.kafka.server.record.BrokerCompressionType;
+
+public class LogConfig extends AbstractConfig {
+
+public static class MessageFormatVersion {
+private final String messageFormatVersionString;
+private final String interBrokerProtocolVersionString;
+private final MetadataVersion messageFormatVersion;
+private final MetadataVersion interBrokerProtocolVersion;
+
+public MessageFormatVersion(String messageFormatVersionString, String 
interBrokerProtocolVersionString) {
+this.messageFormatVersionString = messageFormatVersionString;
+this.interBrokerProtocolVersionString = 
interBrokerProtocolVersionString;
+this.messageFormatVersion = 
MetadataVersion.fromVersionString(messageFormatVersionString);
+this.interBrokerProtocolVersion = 
MetadataVersion.fromVersionString(interBrokerProtocolVersionString);
+}
+
+public MetadataVersion messageFormatVersion() {
+return messageFormatVersion;
+}
+
+public MetadataVersion interBrokerProtocolVersion() {
+return interBrokerProtocolVersion;
+}
+
+public boolean shouldIgnore() {
+return 
shouldIgnoreMessageFormatVersion(interBrokerProtocolVersion);
+}
+
+public boolean shouldWarn() {
+return interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)
+&& 
messageFormatVersion.highestSupportedRecordVersion().precedes(RecordVersion.V2);
+}
+
+@SuppressWarnings("deprecation")
+public String topicWarningMessage(String topicName) {
+return "Topic configuration " + 
TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + " with value `"
++ messageFormat

[jira] [Updated] (KAFKA-14560) Remove old client protocol API versions in Kafka 4.0 (KIP-896)

2023-01-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14560:

Fix Version/s: 4.0.0

> Remove old client protocol API versions in Kafka 4.0 (KIP-896)
> --
>
> Key: KAFKA-14560
> URL: https://issues.apache.org/jira/browse/KAFKA-14560
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 4.0.0
>
>
> Please see KIP for details:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14560) Remove old client protocol API versions in Kafka 4.0 (KIP-896)

2023-01-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14560:

Priority: Blocker  (was: Major)

> Remove old client protocol API versions in Kafka 4.0 (KIP-896)
> --
>
> Key: KAFKA-14560
> URL: https://issues.apache.org/jira/browse/KAFKA-14560
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Please see KIP for details:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on pull request #13069: MINOR: Check process role and set correct logdirs

2023-01-03 Thread GitBox


viktorsomogyi commented on PR #13069:
URL: https://github.com/apache/kafka/pull/13069#issuecomment-1369948285

   (Also the PR will need test changes at least but I wanted to start some 
conversation early on.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13069: MINOR: Check process role and set correct logdirs

2023-01-03 Thread GitBox


viktorsomogyi commented on PR #13069:
URL: https://github.com/apache/kafka/pull/13069#issuecomment-1369947044

   @rondagostino would you please help understanding why does the controller 
need the log.dirs config? Does it store any data in log.dirs? As per my 
understanding it only needs metadata.log.dir as long as it runs in controller 
role only but I'm fairly new with the internal changes of KIP-500 so I might be 
missing something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi opened a new pull request, #13069: MINOR: Check process role and set correct logdirs

2023-01-03 Thread GitBox


viktorsomogyi opened a new pull request, #13069:
URL: https://github.com/apache/kafka/pull/13069

   Currently a controller role will fail if no log.dirs are specified. This is 
caused by KafkaRaftServer as it merges the configs in log.dirs and 
metadata.log.dir. If the paths in log.dirs weren't initialized with 
storage-tool when the process fails. This is unwanted as controller only roles 
don't need log.dirs.
   
   This fix attempts to fix that by not merging the two configs, only if the 
process has both broker and controller roles.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14560) Remove old client protocol API versions in Kafka 4.0 (KIP-896)

2023-01-03 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-14560:
---

 Summary: Remove old client protocol API versions in Kafka 4.0 
(KIP-896)
 Key: KAFKA-14560
 URL: https://issues.apache.org/jira/browse/KAFKA-14560
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma


Please see KIP for details:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14542) Deprecate OffsetFetch/Commit version 0 and remove them in 4.0

2023-01-03 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654070#comment-17654070
 ] 

Ismael Juma commented on KAFKA-14542:
-

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0]
 covers this and more.

> Deprecate OffsetFetch/Commit version 0 and remove them in 4.0
> -
>
> Key: KAFKA-14542
> URL: https://issues.apache.org/jira/browse/KAFKA-14542
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> We should deprecate OffsetFetch/Commit APIs and remove them in AK 4.0. Those 
> two APIs are used by old clients to write offsets to and read offsets from ZK.
> We need a small KIP for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12872) KIP-724: Drop support for message formats v0 and v1

2023-01-03 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12872:

Priority: Blocker  (was: Major)

> KIP-724: Drop support for message formats v0 and v1
> ---
>
> Key: KAFKA-12872
> URL: https://issues.apache.org/jira/browse/KAFKA-12872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 
> 2017) via 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
>  and has been the default since. It includes a number of enhancements 
> (partition leader epoch, sequence ids, producer ids, record headers) required 
> for correctness 
> ([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
>  
> [KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
>  
> [KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
>  stronger semantics (idempotent producers, transactional clients) and other 
> features ([KIP-82 - Add Record 
> Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
>  [KIP-392: Allow consumers to fetch from closest 
> replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).
> Four years later, it's time to sunset message formats v0 and v1 to establish 
> a new baseline in terms of supported client/broker behavior and to improve 
> maintainability & supportability of Kafka. This also aligns with 
> [KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
>  which will enable the idempotent producer by default in Apache Kafka 3.0 
> (and requires message format v2). We propose the deprecation of message 
> formats v0 and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14255) Fetching from follower should be disallowed if fetch from follower is disabled

2023-01-03 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654059#comment-17654059
 ] 

Ismael Juma commented on KAFKA-14255:
-

[~dajac] A couple more projects worth filing a ticket for:
 * [https://github.com/dpkp/kafka-python]
 * [https://github.com/segmentio/kafka-go]

When it comes to librdkafka, there is an open pull request here:
 * https://github.com/confluentinc/librdkafka/pull/4122

> Fetching from follower should be disallowed if fetch from follower is disabled
> --
>
> Key: KAFKA-14255
> URL: https://issues.apache.org/jira/browse/KAFKA-14255
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Critical
>
> There are clients out there that have implemented KIP-392 (Fetch From 
> Follower) and thus use FetchRequest >= 11. However, they have not implemented 
> KIP-320 which add the leader epoch to the FetchRequest in version 9. Without 
> KIP-320, it is not safe to fetch from the follower. If a client does it by 
> mistake – e.g. based on stale metadata – that could lead to offset out of 
> range.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13052: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null offsetAndMetadata more gracefully

2023-01-03 Thread GitBox


mimaison commented on code in PR #13052:
URL: https://github.com/apache/kafka/pull/13052#discussion_r1060672958


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -189,14 +189,17 @@ private Map 
listConsumerGroupOffsets(String g
 
 Optional checkpoint(String group, TopicPartition 
topicPartition,
 OffsetAndMetadata offsetAndMetadata) {
-long upstreamOffset = offsetAndMetadata.offset();
-OptionalLong downstreamOffset = 
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-if (downstreamOffset.isPresent()) {
-return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
+if (offsetAndMetadata != null) {
+long upstreamOffset = offsetAndMetadata.offset();
+OptionalLong downstreamOffset =
+offsetSyncStore.translateDownstream(topicPartition, 
upstreamOffset);
+if (downstreamOffset.isPresent()) {
+return Optional.of(new Checkpoint(group, 
renameTopicPartition(topicPartition),
 upstreamOffset, downstreamOffset.getAsLong(), 
offsetAndMetadata.metadata()));
-} else {
-return Optional.empty();
+}
 }
+return Optional.empty();
+

Review Comment:
   nit: Can you remove this unnecessary line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13068: KAFKA-13999: Add ProducerIdCount metric

2023-01-03 Thread GitBox


clolov commented on PR #13068:
URL: https://github.com/apache/kafka/pull/13068#issuecomment-1369826380

   For visibility:
   @artemlivshits - as the original author
   @showuon - as you commented on the JIRA ticket and I believe you have the 
knowledge to review the change
   @jolshan - as you worked on 
https://issues.apache.org/jira/browse/KAFKA-14097 and I believe you also have 
the background knowledge needed to review this change 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #13068: KAFKA-13999: Add ProducerIdCount metric

2023-01-03 Thread GitBox


clolov opened a new pull request, #13068:
URL: https://github.com/apache/kafka/pull/13068

   _Summary_
   This PR provides an implementation of KIP-847 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics).
 As far as I can tell, the original author @artemlivshits has not provided a 
pull request since the KIP was approved so I took the liberty of picking it up. 
Of course, if they provide a pull request I am happy to close this one.
   
   I do not understand why the metric ought to be surfaced from 
`kafka.server:type=ReplicaManager,name=ProducerIdCount` when the information is 
stored in `ProducerStateManager`. I have thus taken the further liberty of 
surfacing the metric from `ProducerStateManager` directly under 
`kafka.log:type=ProducerStateManager,name=ProducerIdCount`. I am happy to 
revise this given the feedback or if this is accepted to update the KIP.
   
   _Testing_
   Grafana dashboard showing ProducerIdCount increasing when adding idempotent 
producers and decreasing whenever the the producer ids expire: 
https://g-576b9cd7b5.grafana-workspace.us-east-1.amazonaws.com/dashboard/snapshot/U5NfIDaMlm0VTIvmF4ZHJio7Kwuc1pTk
   
   I do not know whether we carry out any unit/integration testing upon 
addition of new metrics, but if I am pointed to similar test cases I am happy 
to revise this PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer

2023-01-03 Thread GitBox


viktorsomogyi commented on code in PR #13032:
URL: https://github.com/apache/kafka/pull/13032#discussion_r1060471853


##
clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+public class DataOutputStreamWritableTest {
+@Test
+public void testWritingSlicedByteBuffer() {
+byte[] expectedArray = new byte[]{2, 3, 0, 0};
+ByteBuffer sourceBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3});
+ByteBuffer resultBuffer = ByteBuffer.allocate(4);
+
+// Move position forward to ensure slice is not whole buffer
+sourceBuffer.position(2);
+ByteBuffer slicedBuffer = sourceBuffer.slice();
+
+Writable writable = new DataOutputStreamWritable(
+new DataOutputStream(new 
ByteBufferOutputStream(resultBuffer)));
+
+writable.writeByteBuffer(slicedBuffer);
+
+assertEquals(2, resultBuffer.position(), "Writing to the buffer moves 
the position forward");
+assertEquals(expectedArray, resultBuffer.array(), "Result buffer 
should have expected elements");

Review Comment:
   Please use assertArrayEquals



##
clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+public class DataOutputStreamWritableTest {
+@Test
+public void testWritingSlicedByteBuffer() {
+byte[] expectedArray = new byte[]{2, 3, 0, 0};
+ByteBuffer sourceBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3});
+ByteBuffer resultBuffer = ByteBuffer.allocate(4);
+
+// Move position forward to ensure slice is not whole buffer
+sourceBuffer.position(2);
+ByteBuffer slicedBuffer = sourceBuffer.slice();
+
+Writable writable = new DataOutputStreamWritable(
+new DataOutputStream(new 
ByteBufferOutputStream(resultBuffer)));
+
+writable.writeByteBuffer(slicedBuffer);
+
+assertEquals(2, resultBuffer.position(), "Writing to the buffer moves 
the position forward");
+assertEquals(expectedArray, resultBuffer.array(), "Result buffer 
should have expected elements");
+}
+
+@Test
+public void testWritingSlicedByteBufferWithNonZeroPosition() {
+byte[] expectedArray = new byte[]{3, 0, 0, 0};
+ByteBuffer originalBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3});
+ByteBuffer resultBuffer = ByteBuffer.allocate(4);
+
+// Move position forward to ensure slice is backed by heap buffer with 
non-zero offset
+originalBuffer.position(2);
+ByteBuffer slicedBuffer = originalBuffer.slice();
+// Move the slice's position forward to ensure the writer starts 
reading at that position
+slicedBuffer.position(1);
+
+Writable writable = new DataOutput

[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-03 Thread GitBox


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1060450083


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I believe that `commitTransaction` ensures that all producer records are 
flushed and all pending callbacks invoked before the transaction is committed, 
so this - `where the callback error is propagated before calling the 
commitTransaction and getting the more generic error message.` should already 
be the case with the current changes?
   
   However, it doesn't look like using callbacks with the transactional 
producer offers much benefit - we could simply reword the existing exception 
message (`this may be due to a transient error and the request can be safely 
retried`) to indicate that it could potentially denote a non-transient error as 
well. Furthermore, the specific case that this PR attempted to fix (missing 
WRITE ACL on the config topic not being surfaced to users properly) is anyway 
highly unlikely to go unnoticed in an EOS enabled Connect cluster since the 
herder thread itself will repeatedly hit this condition 
[here](https://github.com/apache/kafka/blob/9d1f9f77642d7e95dec37647657478ed187bdeb7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L419)
 in the tick loop (the worker logs will reveal the underlying 
`TopicAuthorizationException`) and request processing won't happen at all (all 
external requests that are run on the herder's thread will timeout).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org