[GitHub] [kafka] divijvaidya commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-05 Thread GitBox


divijvaidya commented on code in PR #13078:
URL: https://github.com/apache/kafka/pull/13078#discussion_r1062290508


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -576,6 +576,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }, period = producerIdExpirationCheckIntervalMs, delay = 
producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
 
+  // Visible for testing
+  def removeExpiredProducers(currentTimeMs: Long): Unit = {

Review Comment:
   could we use this method at producerExpireCheck() as well?



##
core/src/main/scala/kafka/log/ProducerStateManager.scala:
##
@@ -685,6 +692,7 @@ class ProducerStateManager(
 
 if (logEndOffset != mapEndOffset) {
   producers.clear()
+  _producerIdCount = 0

Review Comment:
   Please move this together in a function. This coupling between the metric 
value `producers` is bug prone where someone may accidentally update one 
without updating the other. One way to avoid such bugs in future to always 
update the producer map using methods where count would be updated too (ideally 
only those functions should mutate the number of producers). 
   
   Similar comment for adding to the `producers` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14566) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14566:

Description: (was: h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  *List>* 
interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));{code}
 

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is 
never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below this.interceptors 
is never created. 

 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
 {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 

 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
 

their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was 
never created.

 
{code:java}
private void close(long timeoutMs, boolean swallowException) {
   
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 {code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST 
Proxy server where thousands of requests containing interceptor configuration 
failures can occur in seconds resulting in an inadvertent DDoS attack as 
cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this inte

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

  was:
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.
h2. Kafka Consumer Constructor

 
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
 

 
h2. Kafka Producer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception results produces a resource leakage in the 
first interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 
h2. KafkaConsumer Constructor
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
 {code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {

List> interceptorList = (List) 
config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); 
{code}
If the above line results in a runtime exception, the below this.interceptors 
is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
 {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch 
implement  close for resource clean up, 
{code:java}
...
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is 
to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no 
internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls 
the respective container interceptor close method below as the 
{color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never 
created.
{code:java}
private void close(long timeoutMs, boolean

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 


> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and a check exception 
> on the respective Consum

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and a check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  


> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> th

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing on unimplemented classes of this interceptor 
interface.  


> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure me

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors are never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  


> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> T

[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen method which throws a checked exception.  In order to 
maintain backwards compatibility with earlier developed interceptors the 
maybeOpen will check whether the interceptor's interface contains the newer 
open method before calling it accordingly.   

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  


> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for

[jira] [Assigned] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard reassigned KAFKA-14565:
---

Assignee: (was: Terry Beard)

> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Priority: Major
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )

2023-01-05 Thread GitBox


calmera commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1062606439


##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * 
+ * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ *
+ * @param storeBuilder  user defined key value store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param timestampExtractorthe stateless timestamp extractor used for 
this source,
+ *  if not specified the default extractor 
defined in the configs will be used
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already 
registered
+ */
+public synchronized  Topology addReadOnlyStateStore(final 
StoreBuilder storeBuilder,
+  final String 
sourceName,
+  final 
TimestampExtractor timestampExtractor,
+  final 
Deserializer keyDeserializer,
+  final 
Deserializer valueDeserializer,
+  final String 
topic,
+  final String 
processorName,
+  final 
ProcessorSupplier stateUpdateSupplier) {
+if (storeBuilder.loggingEnabled()) {
+// -- disabling logging. We might want to print some logging.

Review Comment:
   agreed, will change accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )

2023-01-05 Thread GitBox


calmera commented on code in PR #12742:
URL: https://github.com/apache/kafka/pull/12742#discussion_r1062611257


##
docs/streams/developer-guide/processor-api.html:
##
@@ -396,6 +397,21 @@ 
 
 
 
+
+ReadOnly State Stores
+The changelog topic for a fault-tolerant state store can 
also be used to perform lookups against. It is
+crucial to understand that there can be only one writer to 
this state store, but there can be multiple
+readers.
+
+This allows you to use a readonly state store for looking 
up reference data managed by another processor
+or even another process all together. The messages within 
the compacted topic may be projected (subset of data extracted)

Review Comment:
   yeah, pretty sure eventually we came to that conclusion as well. Too bad 
though, would have been nice for re-usability purposes.
   
   I will change the description and that the projected stuff out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062627093


##
clients/src/main/resources/common/message/BrokerRegistrationRequest.json:
##
@@ -51,7 +51,7 @@
 },
 { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": 
"0+",
   "about": "The rack which this broker is in." },
-{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", 
"default": "false",
-  "about": "Set by a ZK broker if the required configurations for ZK 
migration are present." }
+{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", 
"default": "-1",

Review Comment:
   can you add a comment to the top of `BrokerRegistrationRequest.json` stating 
that version 1 adds `MigratingZkBrokerEpoch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062628793


##
core/src/main/scala/kafka/server/BrokerEpochManager.scala:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import org.apache.kafka.common.requests.AbstractControlRequest
+
+class BrokerEpochManager(metadataCache: MetadataCache,

Review Comment:
   this is a nice utility class, thanks for adding it.
   
   can we call this `ZkBrokerEpochManager` to emphasize that it's useful only 
in ZK mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14575) Move ClusterTool

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14575:
--

 Summary: Move ClusterTool
 Key: KAFKA-14575
 URL: https://issues.apache.org/jira/browse/KAFKA-14575
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison
Assignee: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062634160


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -55,7 +55,7 @@ class BrokerLifecycleManager(
   val config: KafkaConfig,
   val time: Time,
   val threadNamePrefix: Option[String],
-  val isZkBroker: Boolean = false
+  val zkBrokerEpochSupplier: Option[() => Long] = None

Review Comment:
   this might be a little too clever I think. maybe we can just have a `boolean 
isZkBroker` and then a `() => Long` zkBrokerEpochSupplier ? 
   
   I would really like to distinguish between the "not zk broker" and "zk 
broker doesn't have a broker epoch for some reason" cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062634566


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -292,7 +292,7 @@ class BrokerLifecycleManager(
 }
 val data = new BrokerRegistrationRequestData().
 setBrokerId(nodeId).
-setIsMigratingZkBroker(isZkBroker).
+
setMigratingZkBrokerEpoch(zkBrokerEpochSupplier.map(_.apply()).getOrElse(-1)).

Review Comment:
   again, I would really like to distinguish between the "not zk broker" and 
"zk broker doesn't have a broker epoch for some reason" cases
   
   so if we are a zk broker but don't know the broker epoch for some reason we 
should throw an exception or something (or just retry)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1062635407


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Stream;
+
+/**
+ * This class represents the state of a specific producer-id.
+ * It contains batchMetadata queue which is ordered such that the batch with 
the lowest sequence is at the head of the
+ * queue while the batch with the highest sequence is at the tail of the 
queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN}
+ * elements in the queue. When the queue is at capacity, we remove the first 
element to make space for the incoming batch.
+ */
+public class ProducerStateEntry {
+public static final int NUM_BATCHES_TO_RETAIN = 5;
+private final long producerId;
+private final Deque batchMetadata;
+private short producerEpoch;
+public int coordinatorEpoch;
+public long lastTimestamp;
+public OptionalLong currentTxnFirstOffset;
+
+public ProducerStateEntry(long producerId) {
+this(producerId, Collections.emptyList(), 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty());
+}
+
+public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) {
+this(producerId, Collections.emptyList(), producerEpoch, 
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset);
+}
+
+public ProducerStateEntry(long producerId, List 
batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, 
OptionalLong currentTxnFirstOffset) {

Review Comment:
   Looking at the code, I noticed that we never actually pass multiple 
`BatchMetadata`, we only ever pass one. Given that, maybe this constructor 
could take a single element instead of `List`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062640868


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -348,7 +345,7 @@ class KafkaServer(
 time = time,
 metrics = metrics,
 threadNamePrefix = threadNamePrefix,
-brokerEpochSupplier = () => kafkaController.brokerEpoch
+brokerEpochSupplier = () => brokerEpochManager.get()

Review Comment:
   Don't we need to handle the case where `brokerEpochManager` is null here? 
You would return -1 in that case.
   
   And probably you also need to make `brokerEpochManager` volatile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062645398


##
metadata/src/main/java/org/apache/kafka/metadata/migration/BrokersRpcClient.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+
+public interface BrokersRpcClient {

Review Comment:
   this file was already added as `LegacyPropagator.java`, so not needed here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-05 Thread GitBox


cmccabe commented on code in PR #12998:
URL: https://github.com/apache/kafka/pull/12998#discussion_r1062646329


##
metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineObject;
+
+public class MigrationControlManager {
+private final TimelineObject zkMigrationState;
+
+MigrationControlManager(SnapshotRegistry snapshotRegistry) {
+zkMigrationState = new TimelineObject<>(snapshotRegistry, 
ZkMigrationState.NONE);
+}
+
+public ZkMigrationState zkMigrationState() {

Review Comment:
   does this need to be a public function



##
metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineObject;
+
+public class MigrationControlManager {
+private final TimelineObject zkMigrationState;
+
+MigrationControlManager(SnapshotRegistry snapshotRegistry) {
+zkMigrationState = new TimelineObject<>(snapshotRegistry, 
ZkMigrationState.NONE);
+}
+
+public ZkMigrationState zkMigrationState() {

Review Comment:
   does this need to be a public function or can it be package-private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-05 Thread GitBox


mimaison opened a new pull request, #13080:
URL: https://github.com/apache/kafka/pull/13080

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14575) Move ClusterTool to tools

2023-01-05 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14575:
---
Summary: Move ClusterTool to tools  (was: Move ClusterTool)

> Move ClusterTool to tools
> -
>
> Key: KAFKA-14575
> URL: https://issues.apache.org/jira/browse/KAFKA-14575
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14576) Move ConsoleConsumer to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14576:
--

 Summary: Move ConsoleConsumer to tools
 Key: KAFKA-14576
 URL: https://issues.apache.org/jira/browse/KAFKA-14576
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14577) Move ConsoleProducer to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14577:
--

 Summary: Move ConsoleProducer to tools
 Key: KAFKA-14577
 URL: https://issues.apache.org/jira/browse/KAFKA-14577
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14579) Move DumpLogSegments to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14579:
--

 Summary: Move DumpLogSegments to tools
 Key: KAFKA-14579
 URL: https://issues.apache.org/jira/browse/KAFKA-14579
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14580) Move EndToEndLatency to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14580:
--

 Summary: Move EndToEndLatency to tools
 Key: KAFKA-14580
 URL: https://issues.apache.org/jira/browse/KAFKA-14580
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14578) Move ConsumerPerformance to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14578:
--

 Summary: Move ConsumerPerformance to tools
 Key: KAFKA-14578
 URL: https://issues.apache.org/jira/browse/KAFKA-14578
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14582) Move JmxTool to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14582:
--

 Summary: Move JmxTool to tools
 Key: KAFKA-14582
 URL: https://issues.apache.org/jira/browse/KAFKA-14582
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14581) Move GetOffsetShell to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14581:
--

 Summary: Move GetOffsetShell to tools
 Key: KAFKA-14581
 URL: https://issues.apache.org/jira/browse/KAFKA-14581
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14584) Move StateChangeLogMerger to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14584:
--

 Summary: Move StateChangeLogMerger to tools
 Key: KAFKA-14584
 URL: https://issues.apache.org/jira/browse/KAFKA-14584
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14583) Move ReplicaVerificationTool to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14583:
--

 Summary: Move ReplicaVerificationTool to tools
 Key: KAFKA-14583
 URL: https://issues.apache.org/jira/browse/KAFKA-14583
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14585) Move StorageTool to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14585:
--

 Summary: Move StorageTool to tools
 Key: KAFKA-14585
 URL: https://issues.apache.org/jira/browse/KAFKA-14585
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14586) Move StreamsResetter to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14586:
--

 Summary: Move StreamsResetter to tools
 Key: KAFKA-14586
 URL: https://issues.apache.org/jira/browse/KAFKA-14586
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14587) Move AclCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14587:
--

 Summary: Move AclCommand to tools
 Key: KAFKA-14587
 URL: https://issues.apache.org/jira/browse/KAFKA-14587
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14588) Move ConfigCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14588:
--

 Summary: Move ConfigCommand to tools
 Key: KAFKA-14588
 URL: https://issues.apache.org/jira/browse/KAFKA-14588
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14589) Move ConsumerGroupCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14589:
--

 Summary: Move ConsumerGroupCommand to tools
 Key: KAFKA-14589
 URL: https://issues.apache.org/jira/browse/KAFKA-14589
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14592) Move FeatureCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14592:
--

 Summary: Move FeatureCommand to tools
 Key: KAFKA-14592
 URL: https://issues.apache.org/jira/browse/KAFKA-14592
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14591) Move DeleteRecordsCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14591:
--

 Summary: Move DeleteRecordsCommand to tools
 Key: KAFKA-14591
 URL: https://issues.apache.org/jira/browse/KAFKA-14591
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14590) Move DelegationTokenCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14590:
--

 Summary: Move DelegationTokenCommand to tools
 Key: KAFKA-14590
 URL: https://issues.apache.org/jira/browse/KAFKA-14590
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14596) Move TopicCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14596:
--

 Summary: Move TopicCommand to tools
 Key: KAFKA-14596
 URL: https://issues.apache.org/jira/browse/KAFKA-14596
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14595:
--

 Summary: Move ReassignPartitionsCommand to tools
 Key: KAFKA-14595
 URL: https://issues.apache.org/jira/browse/KAFKA-14595
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14593) Move LeaderElectionCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14593:
--

 Summary: Move LeaderElectionCommand to tools
 Key: KAFKA-14593
 URL: https://issues.apache.org/jira/browse/KAFKA-14593
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14594) Move LogDirsCommand to tools

2023-01-05 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14594:
--

 Summary: Move LogDirsCommand to tools
 Key: KAFKA-14594
 URL: https://issues.apache.org/jira/browse/KAFKA-14594
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14547) Be able to run kafka KRaft Server in tests without needing to run a storage setup script

2023-01-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655063#comment-17655063
 ] 

Ismael Juma commented on KAFKA-14547:
-

Related KIP 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-785%3A+Automatic+storage+formatting

> Be able to run kafka KRaft Server in tests without needing to run a storage 
> setup script
> 
>
> Key: KAFKA-14547
> URL: https://issues.apache.org/jira/browse/KAFKA-14547
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Natan Silnitsky
>Priority: Major
>
> Currently kafka KRaft Server requires running kafka-storage.sh in order to 
> start properly.
> This makes setup much more cubersome for build tools like bazel to work 
> properly.
> One way to mitigate this is to configure the paths via kafkaConfig...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-05 Thread GitBox


gharris1727 commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1062726003


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   > if we want to surface producer errors to users ... which would involve 
changing the method signatures to accept callbacks
   
   This is not true; you can surface errors synchronously by throwing 
exceptions inside the `KafkaConfigBackingStore`  and catching them in the 
caller. This is what I meant by terminating the producer callbacks: transform 
the control flow from asynchronous via a callback to synchronous results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-05 Thread GitBox


gharris1727 commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1062744806


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
 try {
 fencableProducer.beginTransaction();
-fencableProducer.send(new ProducerRecord<>(topic, key, value));
+fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   > should already be the case with the current changes
   
   It may be the case but not for the reason you specified. The guarantee from 
the producer is that the record callbacks are completed before the 
commitTransaction _returns_, while I was suggesting a control flow where we 
ensure that record callbacks are completed before the commitTransaction is 
_called_. If you still call commitTransaction, you will get both the callback's 
error and the synchronous error from commitTransaction, and would still need to 
make sure that the correct exception is shadowed. The control over this 
shadowing is handled by this code and whoever is terminating the callback in 
the REST layer.
   
   > we could simply reword the existing exception message
   
   This is certainly the easy way out, but I don't think we should settle for 
that. Vague error messages indicate that they aren't capturing and reporting 
the exception properly, and making them more vague to compensate doesn't help 
anyone diagnose the underlying issue. If we can make a control flow change that 
reports the errors faithfully, that is going to help someone down the line 
debugging it.
   
   > the worker logs will reveal the underlying TopicAuthorizationException
   
   Is this the case? The worker is using _this_ code path to write the session 
key; If we're hiding the result from the REST calls, are we not also hiding the 
error from the herder tick thread?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-01-05 Thread Atul Jain (Jira)
Atul Jain created KAFKA-14597:
-

 Summary: [Streams] record-e2e-latency-max is not reporting correct 
metrics 
 Key: KAFKA-14597
 URL: https://issues.apache.org/jira/browse/KAFKA-14597
 Project: Kafka
  Issue Type: Bug
Reporter: Atul Jain
 Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg

I was following this KIP documentation 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
 and kafka streams documentation 
([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
 . Based on these documentations , the *record-e2e-latency-max* should monitor 
the full end to end latencies, which includes both *consumption latencies* and  
{*}processing delays{*}.

However, based on my observations , record-e2e-latency-max seems to be only 
measuring the consumption latencies. processing delays can be measured using 
*process-latency-max* .I am checking all this using a simple topology 
consisting of source, processors and sink (code added). I have added some sleep 
time (of 3 seconds) in one of the processors to ensure some delays in the 
processing logic. These delays are not getting accounted in the 
record-e2e-latency-max but are accounted in process-latency-max. 
process-latency-max was observed to be 3002 ms which accounts for sleep time of 
3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which 
does not account for 3 seconds of sleep time.


Code describing my topology:
static Topology buildTopology(String inputTopic, String outputTopic) \{
log.info("Input topic: " + inputTopic + " and output topic: " + 
outputTopic);

Serde stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> log.info("Observed event: key" + k + " value: " 
+ v))
.mapValues(s -> {
try {
System.out.println("sleeping for 3 seconds");
Thread.sleep(3000);
}
catch (InterruptedException e) \{
e.printStackTrace();
}
return  s.toUpperCase();
})
.peek((k,v) -> log.info("Transformed event: key" + k + " value: 
" + v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-01-05 Thread Atul Jain (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Atul Jain updated KAFKA-14597:
--
Description: 
I was following this KIP documentation 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
 and kafka streams documentation 
([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
 . Based on these documentations , the *record-e2e-latency-max* should monitor 
the full end to end latencies, which includes both *consumption latencies* and  
{*}processing delays{*}.

However, based on my observations , record-e2e-latency-max seems to be only 
measuring the consumption latencies. processing delays can be measured using 
*process-latency-max* .I am checking all this using a simple topology 
consisting of source, processors and sink (code added). I have added some sleep 
time (of 3 seconds) in one of the processors to ensure some delays in the 
processing logic. These delays are not getting accounted in the 
record-e2e-latency-max but are accounted in process-latency-max. 
process-latency-max was observed to be 3002 ms which accounts for sleep time of 
3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which 
does not account for 3 seconds of sleep time.

 

Code describing my topology:
{code:java}
   static Topology buildTopology(String inputTopic, String outputTopic) {
log.info("Input topic: " + inputTopic + " and output topic: " + 
outputTopic);

Serde stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> log.info("Observed event: key" + k + " value: " 
+ v))
.mapValues(s -> {
try {
System.out.println("sleeping for 3 seconds");
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
return  s.toUpperCase();
})
.peek((k,v) -> log.info("Transformed event: key" + k + " value: 
" + v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
} {code}

 

  was:
I was following this KIP documentation 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
 and kafka streams documentation 
([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
 . Based on these documentations , the *record-e2e-latency-max* should monitor 
the full end to end latencies, which includes both *consumption latencies* and  
{*}processing delays{*}.

However, based on my observations , record-e2e-latency-max seems to be only 
measuring the consumption latencies. processing delays can be measured using 
*process-latency-max* .I am checking all this using a simple topology 
consisting of source, processors and sink (code added). I have added some sleep 
time (of 3 seconds) in one of the processors to ensure some delays in the 
processing logic. These delays are not getting accounted in the 
record-e2e-latency-max but are accounted in process-latency-max. 
process-latency-max was observed to be 3002 ms which accounts for sleep time of 
3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which 
does not account for 3 seconds of sleep time.


Code describing my topology:
static Topology buildTopology(String inputTopic, String outputTopic) \{
log.info("Input topic: " + inputTopic + " and output topic: " + 
outputTopic);

Serde stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
.peek((k,v) -> log.info("Observed event: key" + k + " value: " 
+ v))
.mapValues(s -> {
try {
System.out.println("sleeping for 3 seconds");
Thread.sleep(3000);
}
catch (InterruptedException e) \{
e.printStackTrace();
}
return  s.toUpperCase();
})
.peek((k,v) -> log.info("Transformed event: key" + k + " value: 
" + v))
.to(outputTopic, Produced.with(stringSerde, stringSerde));
return builder.build();
}
 


> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
>

[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2023-01-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14597:

Component/s: metrics
 streams

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Priority: Major
> Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on pull request #13076: KAFKA-14279; Add 3.3.x to core compatibility tests

2023-01-05 Thread GitBox


jsancio commented on PR #13076:
URL: https://github.com/apache/kafka/pull/13076#issuecomment-1372586923

   Thanks for the review @ijuma. Here are the results: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672892122--jsancio--kafka-14279-3-3-sys-test--22b8a8e2c6/2023-01-04--001./2023-01-04--001./report.html
   
   None of the tests that I added failed. There are 13 system tests that 
mention 3.3.1 which matches what I expected. I am going to merge and cherry 
pick to the 3.4 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14545; Add 3.3.x streams system tests

2023-01-05 Thread GitBox


jsancio commented on PR #13077:
URL: https://github.com/apache/kafka/pull/13077#issuecomment-1372590378

   Here are the system test results: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672893494--jsancio--kafka-14545-stream-3-3-test--a595bcfb9c/2023-01-04--001./2023-01-04--001./report.html
   
   Looks like this test failed:
   ```
   Module: kafkatest.tests.streams.streams_upgrade_test
   Class:  StreamsUpgradeTest
   Method: test_rolling_upgrade_with_2_bounces
   Arguments:
   {
 "from_version": "3.3.1",
 "to_version": "3.5.0-SNAPSHOT"
   }
   ```
   
   I am going to rerun just this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module

2023-01-05 Thread GitBox


junrao commented on code in PR #13049:
URL: https://github.com/apache/kafka/pull/13049#discussion_r1062797781


##
server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import static java.util.Arrays.asList;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.utils.Utils;
+
+public final class ServerTopicConfigSynonyms {
+private static final String LOG_PREFIX = "log.";
+private static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
+
+/**
+ * Maps topic configurations to their equivalent broker configurations.
+ *
+ * Topics can be configured either by setting their dynamic topic 
configurations, or by
+ * setting equivalent broker configurations. For historical reasons, the 
equivalent broker
+ * configurations have different names. This table maps each topic 
configuration to its
+ * equivalent broker configurations.
+ *
+ * In some cases, the equivalent broker configurations must be transformed 
before they
+ * can be used. For example, log.roll.hours must be converted to 
milliseconds before it
+ * can be used as the value of segment.ms.
+ *
+ * The broker configurations will be used in the order specified here. In 
other words, if
+ * both the first and the second synonyms are configured, we will use only 
the value of
+ * the first synonym and ignore the second.
+ */
+// Topic configs with no mapping to a server config can be found in 
`LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS`
+@SuppressWarnings("deprecation")
+public static final Map> 
ALL_TOPIC_CONFIG_SYNONYMS = Collections.unmodifiableMap(Utils.mkMap(
+sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
+listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG,
+new ConfigSynonym("roll.ms"),
+new ConfigSynonym("roll.hours", 
ConfigSynonym.HOURS_TO_MILLISECONDS)),
+listWithLogPrefix(TopicConfig.SEGMENT_JITTER_MS_CONFIG,
+new ConfigSynonym("roll.jitter.ms"),
+new ConfigSynonym("roll.jitter.hours", 
ConfigSynonym.HOURS_TO_MILLISECONDS)),
+singleWithLogPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 
"index.size.max.bytes"),
+singleWithLogPrefix(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, 
"flush.interval.messages"),
+listWithLogPrefix(TopicConfig.FLUSH_MS_CONFIG,
+new ConfigSynonym("flush.interval.ms"),
+new ConfigSynonym("flush.scheduler.interval.ms")),
+sameNameWithLogPrefix(TopicConfig.RETENTION_BYTES_CONFIG),
+listWithLogPrefix(TopicConfig.RETENTION_MS_CONFIG,
+new ConfigSynonym("retention.ms"),
+new ConfigSynonym("retention.minutes", 
ConfigSynonym.MINUTES_TO_MILLISECONDS),
+new ConfigSynonym("retention.hours", 
ConfigSynonym.HOURS_TO_MILLISECONDS)),
+single(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "message.max.bytes"),
+sameNameWithLogPrefix(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG),
+sameNameWithLogCleanerPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG),
+sameNameWithLogCleanerPrefix(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG),
+sameNameWithLogCleanerPrefix(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG),
+singleWithLogPrefix(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 
"segment.delete.delay.ms"),
+
singleWithLogCleanerPrefix(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 
"min.cleanable.ratio"),
+sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG),
+sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG),
+sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG),
+sameName(TopicConfig.COMPRESSION_TYPE_CONFIG),
+sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
+sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG),
+  

[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-01-05 Thread GitBox


philipnee commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1062809110


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;

Review Comment:
   We should finalize the plan for subscriptionState, whether we will reuse the 
same class or implement a new one (and deprecate the old one)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vladimirdyuzhev opened a new pull request, #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used

2023-01-05 Thread GitBox


vladimirdyuzhev opened a new pull request, #13081:
URL: https://github.com/apache/kafka/pull/13081

   When keytab file is not used, and the necessary configuration data are 
provided by the SASL callback handler, the Kerberos TGT renewal fails because 
the code is not re-using the configured CallbackHandler in the re-login 
sequence.
   
   The error is:
   
   ```
   javax.security.auth.login.LoginException: No CallbackHandler available to 
garner authentication information from the user
   ```
   
   The change preserves the instance of the CallbackHandler that was used to 
login into Kerberos and passes it to the LoginContext when TGT needs to be 
renewed. 
   
   The change is tested in DIT with live Kafka and AD KRB instances in our 
current project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13076: KAFKA-14279; Add 3.3.x to core compatibility tests

2023-01-05 Thread GitBox


jsancio merged PR #13076:
URL: https://github.com/apache/kafka/pull/13076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13077: KAFKA-14279; Add 3.3.x streams system tests

2023-01-05 Thread GitBox


mjsax commented on code in PR #13077:
URL: https://github.com/apache/kafka/pull/13077#discussion_r1062838773


##
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Properties;
+import java.util.Random;
+
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+
+public class StreamsUpgradeTest {
+
+@SuppressWarnings("unchecked")
+public static void main(final String[] args) throws Exception {
+if (args.length < 1) {
+System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+}
+final String propFileName = args[0];
+
+final Properties streamsProperties = Utils.loadProps(propFileName);
+
+System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v3.2)");

Review Comment:
   ```suggestion
   System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v3.3)");
   ```



##
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {
+
+final static int END = Integer.MAX_VALUE;
+
+static ProcessorSupplier 
printProcessorSupplier(final String topic) {
+return printProcessorSupplier(topic, "");
+}
+
+static ProcessorSupplier 
printProcessorSupplier(final String topic, final String name) {
+return () -> new ContextualProcessor() {
+private int numRecordsProcessed = 0;
+private long smallestOffset = Long.MAX_VALUE;
+private long largestOffset = Long.MIN_VALUE;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+System.out.println("[3.2] initializing processor: topic=" + 
topic + 

[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests

2023-01-05 Thread GitBox


jsancio commented on PR #13077:
URL: https://github.com/apache/kafka/pull/13077#issuecomment-1372724189

   Same failure again: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672950817--jsancio--kafka-14545-stream-3-3-test--a595bcfb9c/2023-01-05--001./2023-01-05--001./report.html
   
   `TimeoutError('Could not detect Kafka Streams version 3.5.0-SNAPSHOT on 
ubuntu@worker29')`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


jolshan commented on PR #13075:
URL: https://github.com/apache/kafka/pull/13075#issuecomment-1372876700

   So the error occurs when the fetch offset is not the futureLog's log end 
offset. 
   
   Just curious -- is this because the log's highwatermark is lower or higher 
than the futureLog's? 
   
   It seems that the value we put into the InitialFetchState is the first 
offset we fetch from, so just wanted to clarify the race and why this fixes the 
issue. (Not doubting you, just trying to understand 😄 )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2023-01-05 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655153#comment-17655153
 ] 

Jun Rao commented on KAFKA-9087:


[~chia7712] : Thanks for the explanation. Great find! I agree that this is a 
bug and the fix that you suggested makes sense. In alterReplicaLogDirs(), we 
initialize the initial offset for ReplicaAlterLogDirsThread with 
futureLog.highWatermark. We should do the same thing when handling the 
LeaderAndIsrRequest. It seems that the bug was introduced in this PR 
[https://github.com/apache/kafka/pull/6841|https://github.com/apache/kafka/pull/6841.].
 Do you plan to submit a PR?

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 segments, log start 
> offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica

[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2023-01-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655155#comment-17655155
 ] 

Chia-Ping Tsai commented on KAFKA-9087:
---

[~junrao] Please take a look at [https://github.com/apache/kafka/pull/13075]

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 segments, log start 
> offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 4224887 (kafka.cluster.Replica)
> [2019-06-11 12:21:47,090] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:30:04,757] INFO [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Re

[GitHub] [kafka] vladimirdyuzhev commented on pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used

2023-01-05 Thread GitBox


vladimirdyuzhev commented on PR #13081:
URL: https://github.com/apache/kafka/pull/13081#issuecomment-1372892329

   Some tests are failed, but apparently it has nothing to do with Kerberos:
   
   ```
   testSendNonCompressedMessageWithCreateTime(String) > 
kafka.api.PlaintextProducerSendTest.testSendNonCompressedMessageWithCreateTime(String)[1]
 FAILED
   [2023-01-05T21:14:03.931Z] java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 2 larger than available brokers: 1.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


chia7712 commented on PR #13075:
URL: https://github.com/apache/kafka/pull/13075#issuecomment-1372912104

   > is this because the log's highwatermark is lower or higher than the 
futureLog's?
   
   higher, and thanks to @junrao for the great explanation.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] littlehorse-eng opened a new pull request, #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-01-05 Thread GitBox


littlehorse-eng opened a new pull request, #13082:
URL: https://github.com/apache/kafka/pull/13082

   Documentation only—Minor clarification on how max.warmup.replicas works; 
specifically, that one "warmup replica" corresponds to a Task that is restoring 
its state. Also clarifies how max.warmup.replicas interacts with 
probing.rebalance.interval.ms.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-01-05 Thread GitBox


littlehorse-eng commented on code in PR #13082:
URL: https://github.com/apache/kafka/pull/13082#discussion_r1062979323


##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping
+the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+for the reassigned warmups to restore sufficient state for 
them to be transitioned to active tasks. Must be at least 1.
+  
+  
+Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup replica can only be promoted to an active 
Task during

Review Comment:
   @ableegoldman could you please confirm this?
   
   @mjsax any chance you could also let me know how to preview the changes to 
the HTML? I tried opening the file in my browser it wouldn't render...this is 
somewhat embarrassingly my first time editing HTML.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


chia7712 commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063007860


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -205,6 +205,61 @@ class ReplicaManagerTest {
 when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = {

Review Comment:
   @ijuma please take a look at this UT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-05 Thread GitBox


junrao commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1063005106


##
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala:
##
@@ -243,282 +245,287 @@ class LeaderEpochFileCacheTest {
 
 //Given
 val cache = new LeaderEpochFileCache(tp, checkpoint)
-cache.assign(epoch = 2, startOffset = 6)
+cache.assign(2, 6)
 
 //When
 val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
 val cache2 = new LeaderEpochFileCache(tp, checkpoint2)
 
 //Then
 assertEquals(1, cache2.epochEntries.size)
-assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0))
+assertEquals(new EpochEntry(2, 6), cache2.epochEntries.get(0))
   }
 
   @Test
   def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
 //Given
-cache.assign(epoch = 1, startOffset = 5);
+cache.assign(1, 5);
 var logEndOffset = 6
-cache.assign(epoch = 2, startOffset = 6);
+cache.assign(2, 6);
 logEndOffset = 7
 
 //When we update an epoch in the past with a different offset, the log has 
already reached
 //an inconsistent state. Our options are either to raise an error, ignore 
the new append,
 //or truncate the cached epochs to the point of conflict. We take this 
latter approach in
 //order to guarantee that epochs and offsets in the cache increase 
monotonically, which makes
 //the search logic simpler to reason about.
-cache.assign(epoch = 1, startOffset = 7);
+cache.assign(1, 7);
 logEndOffset = 8
 
 //Then later epochs will be removed
-assertEquals(Some(1), cache.latestEpoch)
+assertEquals(Optional.of(1), cache.latestEpoch)
 
 //Then end offset for epoch 1 will have changed
-assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset))
+assertEquals((1, 8), toTuple(cache.endOffsetFor(1, logEndOffset)))
 
 //Then end offset for epoch 2 is now undefined
-assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), 
cache.endOffsetFor(2, logEndOffset))
-assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
+assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), 
toTuple(cache.endOffsetFor(2, logEndOffset)))
+assertEquals(new EpochEntry(1, 7), cache.epochEntries.get(0))
+  }
+
+  def toTuple[K, V](entry: java.util.Map.Entry[K, V]): (K, V) = {

Review Comment:
   Could this be private?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1294,36 +1293,53 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // The first cached epoch usually corresponds to the log start offset, 
but we have to verify this since
 // it may not be true following a message format version bump as the 
epoch will not be available for
 // log entries written in the older format.
-val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
-val epochOpt = earliestEpochEntry match {
-  case Some(entry) if entry.startOffset <= logStartOffset => 
Optional.of[Integer](entry.epoch)
-  case _ => Optional.empty[Integer]()
+val earliestEpochEntry = leaderEpochCache match {
+  case Some(cache) => cache.earliestEntry()
+  case None => Optional.empty[EpochEntry]()
 }
+
+val epochOpt = if (earliestEpochEntry.isPresent && 
earliestEpochEntry.get().startOffset <= logStartOffset) {
+  Optional.of[Integer](earliestEpochEntry.get().epoch)
+} else Optional.empty[Integer]()
+
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, 
epochOpt))
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
-val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache =>
-  
cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry))
-val epochOpt = earliestLocalLogEpochEntry match {
-  case Some(entry) if entry.startOffset <= curLocalLogStartOffset => 
Optional.of[Integer](entry.epoch)
-  case _ => Optional.empty[Integer]()
+
+val earliestLocalLogEpochEntry: Optional[EpochEntry] = 
leaderEpochCache match {
+  case Some(cache) =>
+val value = cache.epochForOffset(curLocalLogStartOffset)
+if (value.isPresent) cache.epochEntry(value.get) else 
Optional.empty[EpochEntry]()
+  case None => Optional.empty[EpochEntry]()
 }
+
+val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
+  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
+else Optional.empty[Integer]()
+
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
-val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).m

[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


jolshan commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063012777


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -205,6 +205,61 @@ class ReplicaManagerTest {
 when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = {

Review Comment:
   You mentioned that the v2 format wasn't as easy to replicate. Is it too hard 
to have a test for? Or are the cases similar enough for v1 to cover both and v1 
is less flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


jolshan commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063013207


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -205,6 +205,61 @@ class ReplicaManagerTest {
 when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.message.format.version", "0.10.2")
+props.put("inter.broker.protocol.version", "2.8")
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = logManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+val partition = rm.createPartition(new TopicPartition(topic, 0))
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+  new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+  Seq(new LeaderAndIsrPartitionState()
+.setTopicName(topic)
+.setPartitionIndex(0)
+.setControllerEpoch(0)
+.setLeader(0)
+.setLeaderEpoch(0)
+.setIsr(Seq[Integer](0).asJava)
+.setPartitionEpoch(0)
+.setReplicas(Seq[Integer](0).asJava)
+.setIsNew(false)).asJava,
+  Collections.singletonMap(topic, Uuid.randomUuid()),
+  Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
+appendRecords(rm, new TopicPartition(topic, 0),
+  MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first 
message".getBytes()), new SimpleRecord("second message".getBytes(
+logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), 
dir2.getAbsolutePath)
+
+partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
+  new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+// this method should use hw of future log to create log dir fetcher. 
Otherwise, it causes offset mismatch error
+rm.maybeAddLogDirFetchers(Set(partition), new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None)
+assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size)
+// wait for the log dir fetcher thread
+TimeUnit.SECONDS.sleep(3)

Review Comment:
   Is there any way we could mock this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


jolshan commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063013207


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -205,6 +205,61 @@ class ReplicaManagerTest {
 when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = {
+val dir1 = TestUtils.tempDir()
+val dir2 = TestUtils.tempDir()
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+props.put("log.message.format.version", "0.10.2")
+props.put("inter.broker.protocol.version", "2.8")
+props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+val config = KafkaConfig.fromProps(props)
+val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+val rm = new ReplicaManager(
+  metrics = metrics,
+  config = config,
+  time = time,
+  scheduler = new MockScheduler(time),
+  logManager = logManager,
+  quotaManagers = quotaManager,
+  metadataCache = metadataCache,
+  logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+  alterPartitionManager = alterPartitionManager)
+val partition = rm.createPartition(new TopicPartition(topic, 0))
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+  new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+  Seq(new LeaderAndIsrPartitionState()
+.setTopicName(topic)
+.setPartitionIndex(0)
+.setControllerEpoch(0)
+.setLeader(0)
+.setLeaderEpoch(0)
+.setIsr(Seq[Integer](0).asJava)
+.setPartitionEpoch(0)
+.setReplicas(Seq[Integer](0).asJava)
+.setIsNew(false)).asJava,
+  Collections.singletonMap(topic, Uuid.randomUuid()),
+  Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
+appendRecords(rm, new TopicPartition(topic, 0),
+  MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first 
message".getBytes()), new SimpleRecord("second message".getBytes(
+logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), 
dir2.getAbsolutePath)
+
+partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
+  new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+// this method should use hw of future log to create log dir fetcher. 
Otherwise, it causes offset mismatch error
+rm.maybeAddLogDirFetchers(Set(partition), new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None)
+assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size)
+// wait for the log dir fetcher thread
+TimeUnit.SECONDS.sleep(3)

Review Comment:
   Is there any way we could mock this and/or confirm the log dir fetcher 
thread is started?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Fix Version/s: 3.5.0

> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention

2023-01-05 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard reassigned KAFKA-14565:
---

Assignee: Terry Beard

> Improving Interceptor Resource Leakage Prevention
> -
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman closed pull request #7824: MINOR: flush only the evicted dirty entry

2023-01-05 Thread GitBox


ableegoldman closed pull request #7824: MINOR: flush only the evicted dirty 
entry
URL: https://github.com/apache/kafka/pull/7824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13083: MINOR: bump year in NOTICE

2023-01-05 Thread GitBox


ableegoldman opened a new pull request, #13083:
URL: https://github.com/apache/kafka/pull/13083

   It's 2023 now y'all
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13083: MINOR: bump year in NOTICE

2023-01-05 Thread GitBox


ableegoldman merged PR #13083:
URL: https://github.com/apache/kafka/pull/13083


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14465) java.lang.NumberFormatException: For input string: "index"

2023-01-05 Thread jianbin.chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655220#comment-17655220
 ] 

jianbin.chen commented on KAFKA-14465:
--

Can you help me with this question?

> java.lang.NumberFormatException: For input string: "index" 
> ---
>
> Key: KAFKA-14465
> URL: https://issues.apache.org/jira/browse/KAFKA-14465
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: jianbin.chen
>Priority: Major
> Attachments: image-2022-12-15-12-26-24-718.png
>
>
> {code:java}
> [2022-12-13 07:12:20,369] WARN [Log partition=fp_sg_flow_copy-1, 
> dir=/home/admin/output/kafka-logs] Found a corrupted index file corresponding 
> to log file /home/admin/output/kafk
> a-logs/fp_sg_flow_copy-1/0165.log due to Corrupt index found, 
> index file 
> (/home/admin/output/kafka-logs/fp_sg_flow_copy-1/0165.index) 
> has non-zero
>  size but the last offset is 165 which is no greater than the base offset 
> 165.}, recovering segment and rebuilding index files... (kafka.log.Log)
> [2022-12-13 07:12:20,369] ERROR There was an error in one of the threads 
> during logs loading: java.lang.NumberFormatException: For input string: 
> "index" (kafka.log.LogManager)
> [2022-12-13 07:12:20,374] INFO [ProducerStateManager 
> partition=fp_sg_flow_copy-1] Writing producer snapshot at offset 165 
> (kafka.log.ProducerStateManager)
> [2022-12-13 07:12:20,378] INFO [Log partition=fp_sg_flow_copy-1, 
> dir=/home/admin/output/kafka-logs] Loading producer state from offset 165 
> with message format version 2 (kafka.lo
> g.Log)
> [2022-12-13 07:12:20,381] INFO [Log partition=fp_sg_flow_copy-1, 
> dir=/home/admin/output/kafka-logs] Completed load of log with 1 segments, log 
> start offset 165 and log end offset
>  165 in 13 ms (kafka.log.Log)
> [2022-12-13 07:12:20,389] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.NumberFormatException: For input string: "index"
> at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:589)
> at java.lang.Long.parseLong(Long.java:631)
> at scala.collection.immutable.StringLike.toLong(StringLike.scala:306)
> at scala.collection.immutable.StringLike.toLong$(StringLike.scala:306)
> at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
> at kafka.log.Log$.offsetFromFile(Log.scala:1846)
> at kafka.log.Log.$anonfun$loadSegmentFiles$3(Log.scala:331)
> at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
> at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
> at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
> at kafka.log.Log.loadSegmentFiles(Log.scala:320)
> at kafka.log.Log.loadSegments(Log.scala:403)
> at kafka.log.Log.(Log.scala:216)
> at kafka.log.Log$.apply(Log.scala:1748)
> at kafka.log.LogManager.loadLog(LogManager.scala:265)
> at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:335)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> [2022-12-13 07:12:20,401] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> {code}
> When I restart the broker, it becomes like this, I deleted the 
> 000165.index file, after starting it, there are still other 
> files with the same error, please tell me how to fix it and what it is causing
>  
> I tried to open the file 0165.index but it failed
> !image-2022-12-15-12-26-24-718.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…

2023-01-05 Thread GitBox


ijuma commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063058965


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -205,6 +205,61 @@ class ReplicaManagerTest {
 when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = {

Review Comment:
   V1 is deprecated and we intend to remove it (along with V0). We should not 
be writing new tests with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13083: MINOR: bump year in NOTICE

2023-01-05 Thread GitBox


showuon commented on PR #13083:
URL: https://github.com/apache/kafka/pull/13083#issuecomment-1373098778

   Happy new year! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837


##
storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public final class TxnMetadata {
+public final long producerId;
+public final LogOffsetMetadata firstOffset;
+public OptionalLong lastOffset;
+
+public TxnMetadata(long producerId,
+   LogOffsetMetadata firstOffset,
+   OptionalLong lastOffset) {
+this.producerId = producerId;
+this.firstOffset = firstOffset;
+this.lastOffset = lastOffset;
+}
+public TxnMetadata(long producerId, long firstOffset) {
+this(producerId, new LogOffsetMetadata(firstOffset));
+}
+
+public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
+this(producerId, firstOffset, OptionalLong.empty());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+TxnMetadata that = (TxnMetadata) o;
+
+if (producerId != that.producerId) return false;
+if (!Objects.equals(firstOffset, that.firstOffset)) return false;
+return Objects.equals(lastOffset, that.lastOffset);

Review Comment:
   I avoid non final fields usage in equals/hashcode implementations. I should 
have left a comment in the code on why these methods were added. 
   I tried to keep similar semantics like case classes in Scala as earlier. 
Afaik, it does all the fields eq(reference checks) as the default 
implementation. TxnMetadata is used as a value of TreeMap in 
ProducerStateManager. Currently, I do not see any usages in map#containsValue 
in that. So, I prefer to avoid implementing it for now as you suggested.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063067362


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -354,9 +356,10 @@ class LogSegmentTest {
 
 // recover again, but this time assuming the transaction from pid2 began 
on a previous segment
 stateManager = newProducerStateManager()
-stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
-  mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, 
RecordBatch.NO_TIMESTAMP)), producerEpoch,
-  0, RecordBatch.NO_TIMESTAMP, Some(75L)))
+val batchMetadata = new util.ArrayList[BatchMetadata]()

Review Comment:
   ArrayList constructor does not allow passing a single instance like 
mutable.Queue. Anyways this is changed to Collections.singletonList in the 
latest code. I saw your suggestion to remove this List and take single instance 
directly. I will reply to your comment there.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063067653


##
storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * The last written record for a given producer. The last data offset may be 
undefined
+ * if the only log entry for a producer is a transaction marker.
+ */
+public final class LastRecord {
+public final OptionalLong lastDataOffset;
+public final short producerEpoch;
+
+public LastRecord(OptionalLong lastDataOffset, short producerEpoch) {
+this.lastDataOffset = lastDataOffset;
+this.producerEpoch = producerEpoch;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+LastRecord that = (LastRecord) o;
+
+if (producerEpoch != that.producerEpoch) return false;
+return Objects.equals(lastDataOffset, that.lastDataOffset);
+}
+
+@Override
+public int hashCode() {
+int result = lastDataOffset != null ? lastDataOffset.hashCode() : 0;
+result = 31 * result + (int) producerEpoch;

Review Comment:
   Good catch! It is generated by intellij. I do not think this is really 
needed as you suggested. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063068193


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Stream;
+
+/**
+ * The batchMetadata is ordered such that the batch with the lowest sequence 
is at the head of the queue while the
+ * batch with the highest sequence is at the tail of the queue. We will retain 
at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN}
+ * elements in the queue. When the queue is at capacity, we remove the first 
element to make space for the incoming batch.
+ */
+public class ProducerStateEntry {
+public static final int NUM_BATCHES_TO_RETAIN = 5;
+public final long producerId;
+private final List batchMetadata;
+private short producerEpoch;
+public int coordinatorEpoch;
+public long lastTimestamp;
+public OptionalLong currentTxnFirstOffset;
+
+public ProducerStateEntry(long producerId) {
+this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, 
RecordBatch.NO_TIMESTAMP, OptionalLong.empty());
+}
+
+public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) {
+this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, 
lastTimestamp, currentTxnFirstOffset);
+}
+
+public ProducerStateEntry(long producerId, List 
batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, 
OptionalLong currentTxnFirstOffset) {
+this.producerId = producerId;
+this.batchMetadata = batchMetadata;
+this.producerEpoch = producerEpoch;
+this.coordinatorEpoch = coordinatorEpoch;
+this.lastTimestamp = lastTimestamp;
+this.currentTxnFirstOffset = currentTxnFirstOffset;
+}
+
+public int firstSeq() {
+return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.get(0).firstSeq();
+}
+
+
+public long firstDataOffset() {
+return isEmpty() ? -1L : batchMetadata.get(0).firstOffset();
+}
+
+public int lastSeq() {
+return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.get(batchMetadata.size() - 1).lastSeq;
+}
+
+public long lastDataOffset() {
+return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 
1).lastOffset;
+}
+
+public int lastOffsetDelta() {
+return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 
1).offsetDelta;
+}
+
+public boolean isEmpty() {
+return batchMetadata.isEmpty();
+}
+
+public void addBatch(short producerEpoch, int lastSeq, long lastOffset, 
int offsetDelta, long timestamp) {
+maybeUpdateProducerEpoch(producerEpoch);
+addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, 
timestamp));
+this.lastTimestamp = timestamp;
+}
+
+public boolean maybeUpdateProducerEpoch(short producerEpoch) {
+if (this.producerEpoch != producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+private void addBatchMetadata(BatchMetadata batch) {
+if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) 
batchMetadata.remove(0);
+batchMetadata.add(batch);
+}
+
+public void update(ProducerStateEntry nextEntry) {
+maybeUpdateProducerEpoch(nextEntry.producerEpoch);
+while (!nextEntry.batchMetadata.isEmpty()) 
addBatchMetadata(nextEntry.batchMetadata.remove(0));
+this.coordinatorEpoch = nextEntry.coordinatorEpoch;
+this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset;
+this.lastTimestamp = nextEntry.lastTimestamp;
+}
+
+public Optional findDuplicateBatch(RecordBatch batc

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063070497


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java:
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Stream;
+
+/**
+ * The batchMetadata is ordered such that the batch with the lowest sequence 
is at the head of the queue while the
+ * batch with the highest sequence is at the tail of the queue. We will retain 
at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN}
+ * elements in the queue. When the queue is at capacity, we remove the first 
element to make space for the incoming batch.
+ */
+public class ProducerStateEntry {
+public static final int NUM_BATCHES_TO_RETAIN = 5;
+public final long producerId;
+private final List batchMetadata;
+private short producerEpoch;
+public int coordinatorEpoch;
+public long lastTimestamp;
+public OptionalLong currentTxnFirstOffset;
+
+public ProducerStateEntry(long producerId) {
+this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, 
RecordBatch.NO_TIMESTAMP, OptionalLong.empty());
+}

Review Comment:
   It was part of `ProducerStateEntry#empty()` method. Avoided creating a 
static method and introduced a constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063071578


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the la

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063072191


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +

Review Comment:
   Good catch! These were left wh

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837


##
storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public final class TxnMetadata {
+public final long producerId;
+public final LogOffsetMetadata firstOffset;
+public OptionalLong lastOffset;
+
+public TxnMetadata(long producerId,
+   LogOffsetMetadata firstOffset,
+   OptionalLong lastOffset) {
+this.producerId = producerId;
+this.firstOffset = firstOffset;
+this.lastOffset = lastOffset;
+}
+public TxnMetadata(long producerId, long firstOffset) {
+this(producerId, new LogOffsetMetadata(firstOffset));
+}
+
+public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
+this(producerId, firstOffset, OptionalLong.empty());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+TxnMetadata that = (TxnMetadata) o;
+
+if (producerId != that.producerId) return false;
+if (!Objects.equals(firstOffset, that.firstOffset)) return false;
+return Objects.equals(lastOffset, that.lastOffset);

Review Comment:
   I avoid non final fields usage in equals/hashcode implementations. I should 
have left a comment in the code on why these methods were added. 
   I tried to keep similar semantics like case classes in Scala as earlier. 
Afaik, it does all the fields eq(reference checks) as the default 
implementation. `TxnMetadata` is used as a value of `TreeMap` in 
ProducerStateManager. Currently, I do not see any usages in map#containsValue 
in that. So, I prefer to avoid implementing it for now as you suggested. But 
the existing tests have equality assertions with `TxnMetadata`, one way to 
address them is to have the equality assertions for producer id and 
offsetmetadata. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


ijuma commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063147960


##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to validate the records appended by a given producer 
before they are written to the log.
+ * It is initialized with the producer's state after the last successful 
append, and transitively validates the
+ * sequence numbers and epochs of each new record. Additionally, this class 
accumulates transaction metadata
+ * as the incoming records are validated.
+ */
+public class ProducerAppendInfo {
+private static final Logger log = 
LoggerFactory.getLogger(ProducerAppendInfo.class);
+private final TopicPartition topicPartition;
+public final long producerId;
+private final ProducerStateEntry currentEntry;
+private final AppendOrigin origin;
+
+private final List transactions = new ArrayList<>();
+private final ProducerStateEntry updatedEntry;
+
+/**
+ * @param topicPartition topic partition
+ * @param producerId The id of the producer appending to the log
+ * @param currentEntry   The current entry associated with the producer id 
which contains metadata for a fixed number of
+ *   the most recent appends made by the producer. 
Validation of the first incoming append will
+ *   be made against the latest append in the current 
entry. New appends will replace older appends
+ *   in the current entry so that the space overhead 
is constant.
+ * @param origin Indicates the origin of the append which implies 
the extent of validation. For example, offset
+ *   commits, which originate from the group 
coordinator, do not have sequence numbers and therefore
+ *   only producer epoch validation is done. Appends 
which come through replication are not validated
+ *   (we assume the validation has already been done) 
and appends from clients require full validation.
+ */
+public ProducerAppendInfo(TopicPartition topicPartition,
+  long producerId,
+  ProducerStateEntry currentEntry,
+  AppendOrigin origin) {
+this.topicPartition = topicPartition;
+this.producerId = producerId;
+this.currentEntry = currentEntry;
+this.origin = origin;
+
+updatedEntry = new ProducerStateEntry(producerId, 
currentEntry.producerEpoch(),
+currentEntry.coordinatorEpoch,
+currentEntry.lastTimestamp,
+currentEntry.currentTxnFirstOffset);
+}
+
+private void maybeValidateDataBatch(short producerEpoch, int firstSeq, 
long offset) {
+checkProducerEpoch(producerEpoch, offset);
+if (origin == AppendOrigin.CLIENT) {
+checkSequence(producerEpoch, firstSeq, offset);
+}
+}
+
+private void checkProducerEpoch(short producerEpoch, long offset) {
+if (producerEpoch < updatedEntry.producerEpoch()) {
+String message = String.format("Epoch of producer %d at offset %d 
in %s is %d, " +
+"which is smaller than the last

[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837


##
storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public final class TxnMetadata {
+public final long producerId;
+public final LogOffsetMetadata firstOffset;
+public OptionalLong lastOffset;
+
+public TxnMetadata(long producerId,
+   LogOffsetMetadata firstOffset,
+   OptionalLong lastOffset) {
+this.producerId = producerId;
+this.firstOffset = firstOffset;
+this.lastOffset = lastOffset;
+}
+public TxnMetadata(long producerId, long firstOffset) {
+this(producerId, new LogOffsetMetadata(firstOffset));
+}
+
+public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
+this(producerId, firstOffset, OptionalLong.empty());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+TxnMetadata that = (TxnMetadata) o;
+
+if (producerId != that.producerId) return false;
+if (!Objects.equals(firstOffset, that.firstOffset)) return false;
+return Objects.equals(lastOffset, that.lastOffset);

Review Comment:
   I try to avoid non final fields usage in equals/hashcode implementations. I 
should have left a comment in the code on why these methods were added. 
   I tried to keep similar semantics like case classes in Scala as earlier. 
Afaik, it does all the fields eq(reference checks) as the default 
implementation. `TxnMetadata` is used as a value of `TreeMap` structure in 
ProducerStateManager. Currently, I do not see any usages in map#containsValue 
in that. So, I prefer to avoid implementing it for now as you suggested. But 
the existing tests have equality assertions with `TxnMetadata`, one way to 
address them is to have the equality assertions for producer id and 
LogOffsetMetadata. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module

2023-01-05 Thread GitBox


satishd commented on code in PR #13043:
URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837


##
storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public final class TxnMetadata {
+public final long producerId;
+public final LogOffsetMetadata firstOffset;
+public OptionalLong lastOffset;
+
+public TxnMetadata(long producerId,
+   LogOffsetMetadata firstOffset,
+   OptionalLong lastOffset) {
+this.producerId = producerId;
+this.firstOffset = firstOffset;
+this.lastOffset = lastOffset;
+}
+public TxnMetadata(long producerId, long firstOffset) {
+this(producerId, new LogOffsetMetadata(firstOffset));
+}
+
+public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) {
+this(producerId, firstOffset, OptionalLong.empty());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+TxnMetadata that = (TxnMetadata) o;
+
+if (producerId != that.producerId) return false;
+if (!Objects.equals(firstOffset, that.firstOffset)) return false;
+return Objects.equals(lastOffset, that.lastOffset);

Review Comment:
   I try to avoid mutable fields usage in equals/hashcode implementations. But 
I should have left a comment in the code on why these methods were added. I 
tried to keep similar semantics like case classes in Scala as earlier. Afaik, 
it does all the fields eq(reference checks) as the default implementation. 
   `TxnMetadata` is used as a value of `TreeMap` structure in 
ProducerStateManager. Currently, I do not see any usages in map#containsValue 
in that. So, I prefer to avoid implementing it for now as you suggested. But 
the existing tests have equality assertions with `TxnMetadata`, one way to 
address them is to have the equality assertions for producer id and 
LogOffsetMetadata. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org