[GitHub] [kafka] divijvaidya commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
divijvaidya commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1062290508 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -576,6 +576,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, } }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) + // Visible for testing + def removeExpiredProducers(currentTimeMs: Long): Unit = { Review Comment: could we use this method at producerExpireCheck() as well? ## core/src/main/scala/kafka/log/ProducerStateManager.scala: ## @@ -685,6 +692,7 @@ class ProducerStateManager( if (logEndOffset != mapEndOffset) { producers.clear() + _producerIdCount = 0 Review Comment: Please move this together in a function. This coupling between the metric value `producers` is bug prone where someone may accidentally update one without updating the other. One way to avoid such bugs in future to always update the producer map using methods where count would be updated too (ideally only those functions should mutate the number of producers). Similar comment for adding to the `producers` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14566) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces
[ https://issues.apache.org/jira/browse/KAFKA-14566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14566: Description: (was: h2. PROBLEM The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured *List>* interceptors. h2. Kafka Consumer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); Kafka Producer Constructor {code} h2. Kafka Producer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));{code} This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. h2. Kafka Consumer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); {code} If the above line results in a runtime exception, the below {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is never created. {code:java} this.interceptors = new ConsumerInterceptors<>(interceptorList);{code} h2. Kafka Producer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); {code} If the above line results in a runtime exception, the below this.interceptors is never created. {code:java} if (interceptors != null) this.interceptors = interceptors; else this.interceptors = new ProducerInterceptors<>(interceptorList); {code} Although, both Kafka Consumer and Kafka Producer constructors try/catch implement close for resource clean up, {code:java} catch (Throwable t) { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized. if (this.log != null) { close(0, true); } // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); } {code} their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was never created. {code:java} private void close(long timeoutMs, boolean swallowException) { Utils.closeQuietly(interceptors, "consumer interceptors", firstException); {code} This problem is magnified within a webserver cluster i.e. Confluent's REST Proxy server where thousands of requests containing interceptor configuration failures can occur in seconds resulting in an inadvertent DDoS attack as cluster resources are quickly exhausted, disrupting all service activities. h2. PROPOSAL To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this inte
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. was: h2. PROBLEM The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delagated responsibilty for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. h2. Kafka Consumer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); {code} h2. Kafka Producer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); {code} This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. h2. KafkaConsumer Constructor {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); {code} If the above line results in a runtime exception, the below this.interceptors is never created. {code:java} this.interceptors = new ConsumerInterceptors<>(interceptorList); {code} h2. Kafka Producer{color:#172b4d} Constructor{color} {code:java} try { List> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); {code} If the above line results in a runtime exception, the below this.interceptors is never created. {code:java} if (interceptors != null) this.interceptors = interceptors; else this.interceptors = new ProducerInterceptors<>(interceptorList); {code} Although, both Kafka Consumer and Kafka Producer constructors try/catch implement close for resource clean up, {code:java} ... catch (Throwable t) { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized. if (this.log != null) { close(0, true); } // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); } {code} their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never created. {code:java} private void close(long timeoutMs, boolean
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors are never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and a check exception > on the respective Consum
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface. > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > th
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface. > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure me
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented. > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > T
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Description: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented. Additionally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen method which throws a checked exception. In order to maintain backwards compatibility with earlier developed interceptors the maybeOpen will check whether the interceptor's interface contains the newer open method before calling it accordingly. was: The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. Currently within the Kafka Consumer and Kafka Producer constructors, the AbstractConfig.getConfiguredInstances() is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured List> interceptors. This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception. This raising of the runtime exception produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors is never created and therefore the first interceptor's and really any interceptor's close method are never called. To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and check exception on the respective Consumer/Producer interceptor interfaces. This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up. Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing when unimplemented. > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for
[jira] [Assigned] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard reassigned KAFKA-14565: --- Assignee: (was: Terry Beard) > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Priority: Major > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and check exception on > the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing when unimplemented. > Additionally, the Kafka Consumer/Producer Interceptor containers will > implement a corresponding maybeOpen method which throws a checked exception. > In order to maintain backwards compatibility with earlier developed > interceptors the maybeOpen will check whether the interceptor's interface > contains the newer open method before calling it accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )
calmera commented on code in PR #12742: URL: https://github.com/apache/kafka/pull/12742#discussion_r1062606439 ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final StoreBuilder storeBuilder, return this; } +/** + * Adds a Read Only {@link StateStore} to the topology. + * + * A Read Only StateStore can use any compacted topic as a changelog. + * + * A {@link SourceNode} will be added to consume the data arriving from the partitions of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all + * records forwarded from the {@link SourceNode}. + * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * + * @param storeBuilder user defined key value store builder + * @param sourceNamename of the {@link SourceNode} that will be automatically added + * @param timestampExtractorthe stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the data from + * @param processorName the name of the {@link ProcessorSupplier} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @return itself + * @throws TopologyException if the processor of state is already registered + */ +public synchronized Topology addReadOnlyStateStore(final StoreBuilder storeBuilder, + final String sourceName, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { +if (storeBuilder.loggingEnabled()) { +// -- disabling logging. We might want to print some logging. Review Comment: agreed, will change accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] calmera commented on a diff in pull request #12742: KAFKA-10892: Shared Readonly State Stores ( revisited )
calmera commented on code in PR #12742: URL: https://github.com/apache/kafka/pull/12742#discussion_r1062611257 ## docs/streams/developer-guide/processor-api.html: ## @@ -396,6 +397,21 @@ + +ReadOnly State Stores +The changelog topic for a fault-tolerant state store can also be used to perform lookups against. It is +crucial to understand that there can be only one writer to this state store, but there can be multiple +readers. + +This allows you to use a readonly state store for looking up reference data managed by another processor +or even another process all together. The messages within the compacted topic may be projected (subset of data extracted) Review Comment: yeah, pretty sure eventually we came to that conclusion as well. Too bad though, would have been nice for re-usability purposes. I will change the description and that the projected stuff out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062627093 ## clients/src/main/resources/common/message/BrokerRegistrationRequest.json: ## @@ -51,7 +51,7 @@ }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The rack which this broker is in." }, -{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false", - "about": "Set by a ZK broker if the required configurations for ZK migration are present." } +{ "name": "MigratingZkBrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", Review Comment: can you add a comment to the top of `BrokerRegistrationRequest.json` stating that version 1 adds `MigratingZkBrokerEpoch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062628793 ## core/src/main/scala/kafka/server/BrokerEpochManager.scala: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.controller.KafkaController +import org.apache.kafka.common.requests.AbstractControlRequest + +class BrokerEpochManager(metadataCache: MetadataCache, Review Comment: this is a nice utility class, thanks for adding it. can we call this `ZkBrokerEpochManager` to emphasize that it's useful only in ZK mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14575) Move ClusterTool
Mickael Maison created KAFKA-14575: -- Summary: Move ClusterTool Key: KAFKA-14575 URL: https://issues.apache.org/jira/browse/KAFKA-14575 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison Assignee: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062634160 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -55,7 +55,7 @@ class BrokerLifecycleManager( val config: KafkaConfig, val time: Time, val threadNamePrefix: Option[String], - val isZkBroker: Boolean = false + val zkBrokerEpochSupplier: Option[() => Long] = None Review Comment: this might be a little too clever I think. maybe we can just have a `boolean isZkBroker` and then a `() => Long` zkBrokerEpochSupplier ? I would really like to distinguish between the "not zk broker" and "zk broker doesn't have a broker epoch for some reason" cases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062634566 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -292,7 +292,7 @@ class BrokerLifecycleManager( } val data = new BrokerRegistrationRequestData(). setBrokerId(nodeId). -setIsMigratingZkBroker(isZkBroker). + setMigratingZkBrokerEpoch(zkBrokerEpochSupplier.map(_.apply()).getOrElse(-1)). Review Comment: again, I would really like to distinguish between the "not zk broker" and "zk broker doesn't have a broker epoch for some reason" cases so if we are a zk broker but don't know the broker epoch for some reason we should throw an exception or something (or just retry) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1062635407 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * This class represents the state of a specific producer-id. + * It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the + * queue while the batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { +public static final int NUM_BATCHES_TO_RETAIN = 5; +private final long producerId; +private final Deque batchMetadata; +private short producerEpoch; +public int coordinatorEpoch; +public long lastTimestamp; +public OptionalLong currentTxnFirstOffset; + +public ProducerStateEntry(long producerId) { +this(producerId, Collections.emptyList(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); +} + +public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { +this(producerId, Collections.emptyList(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); +} + +public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { Review Comment: Looking at the code, I noticed that we never actually pass multiple `BatchMetadata`, we only ever pass one. Given that, maybe this constructor could take a single element instead of `List`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062640868 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -348,7 +345,7 @@ class KafkaServer( time = time, metrics = metrics, threadNamePrefix = threadNamePrefix, -brokerEpochSupplier = () => kafkaController.brokerEpoch +brokerEpochSupplier = () => brokerEpochManager.get() Review Comment: Don't we need to handle the case where `brokerEpochManager` is null here? You would return -1 in that case. And probably you also need to make `brokerEpochManager` volatile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062645398 ## metadata/src/main/java/org/apache/kafka/metadata/migration/BrokersRpcClient.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; + +public interface BrokersRpcClient { Review Comment: this file was already added as `LegacyPropagator.java`, so not needed here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe commented on code in PR #12998: URL: https://github.com/apache/kafka/pull/12998#discussion_r1062646329 ## metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.ZkMigrationStateRecord; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineObject; + +public class MigrationControlManager { +private final TimelineObject zkMigrationState; + +MigrationControlManager(SnapshotRegistry snapshotRegistry) { +zkMigrationState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE); +} + +public ZkMigrationState zkMigrationState() { Review Comment: does this need to be a public function ## metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.ZkMigrationStateRecord; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineObject; + +public class MigrationControlManager { +private final TimelineObject zkMigrationState; + +MigrationControlManager(SnapshotRegistry snapshotRegistry) { +zkMigrationState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE); +} + +public ZkMigrationState zkMigrationState() { Review Comment: does this need to be a public function or can it be package-private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13080: KAFKA-14575: Move ClusterTool to tools module
mimaison opened a new pull request, #13080: URL: https://github.com/apache/kafka/pull/13080 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14575) Move ClusterTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-14575: --- Summary: Move ClusterTool to tools (was: Move ClusterTool) > Move ClusterTool to tools > - > > Key: KAFKA-14575 > URL: https://issues.apache.org/jira/browse/KAFKA-14575 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14576) Move ConsoleConsumer to tools
Mickael Maison created KAFKA-14576: -- Summary: Move ConsoleConsumer to tools Key: KAFKA-14576 URL: https://issues.apache.org/jira/browse/KAFKA-14576 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14577) Move ConsoleProducer to tools
Mickael Maison created KAFKA-14577: -- Summary: Move ConsoleProducer to tools Key: KAFKA-14577 URL: https://issues.apache.org/jira/browse/KAFKA-14577 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14579) Move DumpLogSegments to tools
Mickael Maison created KAFKA-14579: -- Summary: Move DumpLogSegments to tools Key: KAFKA-14579 URL: https://issues.apache.org/jira/browse/KAFKA-14579 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14580) Move EndToEndLatency to tools
Mickael Maison created KAFKA-14580: -- Summary: Move EndToEndLatency to tools Key: KAFKA-14580 URL: https://issues.apache.org/jira/browse/KAFKA-14580 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14578) Move ConsumerPerformance to tools
Mickael Maison created KAFKA-14578: -- Summary: Move ConsumerPerformance to tools Key: KAFKA-14578 URL: https://issues.apache.org/jira/browse/KAFKA-14578 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14582) Move JmxTool to tools
Mickael Maison created KAFKA-14582: -- Summary: Move JmxTool to tools Key: KAFKA-14582 URL: https://issues.apache.org/jira/browse/KAFKA-14582 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14581) Move GetOffsetShell to tools
Mickael Maison created KAFKA-14581: -- Summary: Move GetOffsetShell to tools Key: KAFKA-14581 URL: https://issues.apache.org/jira/browse/KAFKA-14581 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14584) Move StateChangeLogMerger to tools
Mickael Maison created KAFKA-14584: -- Summary: Move StateChangeLogMerger to tools Key: KAFKA-14584 URL: https://issues.apache.org/jira/browse/KAFKA-14584 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14583) Move ReplicaVerificationTool to tools
Mickael Maison created KAFKA-14583: -- Summary: Move ReplicaVerificationTool to tools Key: KAFKA-14583 URL: https://issues.apache.org/jira/browse/KAFKA-14583 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14585) Move StorageTool to tools
Mickael Maison created KAFKA-14585: -- Summary: Move StorageTool to tools Key: KAFKA-14585 URL: https://issues.apache.org/jira/browse/KAFKA-14585 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14586) Move StreamsResetter to tools
Mickael Maison created KAFKA-14586: -- Summary: Move StreamsResetter to tools Key: KAFKA-14586 URL: https://issues.apache.org/jira/browse/KAFKA-14586 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14587) Move AclCommand to tools
Mickael Maison created KAFKA-14587: -- Summary: Move AclCommand to tools Key: KAFKA-14587 URL: https://issues.apache.org/jira/browse/KAFKA-14587 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14588) Move ConfigCommand to tools
Mickael Maison created KAFKA-14588: -- Summary: Move ConfigCommand to tools Key: KAFKA-14588 URL: https://issues.apache.org/jira/browse/KAFKA-14588 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14589) Move ConsumerGroupCommand to tools
Mickael Maison created KAFKA-14589: -- Summary: Move ConsumerGroupCommand to tools Key: KAFKA-14589 URL: https://issues.apache.org/jira/browse/KAFKA-14589 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14592) Move FeatureCommand to tools
Mickael Maison created KAFKA-14592: -- Summary: Move FeatureCommand to tools Key: KAFKA-14592 URL: https://issues.apache.org/jira/browse/KAFKA-14592 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14591) Move DeleteRecordsCommand to tools
Mickael Maison created KAFKA-14591: -- Summary: Move DeleteRecordsCommand to tools Key: KAFKA-14591 URL: https://issues.apache.org/jira/browse/KAFKA-14591 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14590) Move DelegationTokenCommand to tools
Mickael Maison created KAFKA-14590: -- Summary: Move DelegationTokenCommand to tools Key: KAFKA-14590 URL: https://issues.apache.org/jira/browse/KAFKA-14590 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14596) Move TopicCommand to tools
Mickael Maison created KAFKA-14596: -- Summary: Move TopicCommand to tools Key: KAFKA-14596 URL: https://issues.apache.org/jira/browse/KAFKA-14596 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14595) Move ReassignPartitionsCommand to tools
Mickael Maison created KAFKA-14595: -- Summary: Move ReassignPartitionsCommand to tools Key: KAFKA-14595 URL: https://issues.apache.org/jira/browse/KAFKA-14595 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14593) Move LeaderElectionCommand to tools
Mickael Maison created KAFKA-14593: -- Summary: Move LeaderElectionCommand to tools Key: KAFKA-14593 URL: https://issues.apache.org/jira/browse/KAFKA-14593 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14594) Move LogDirsCommand to tools
Mickael Maison created KAFKA-14594: -- Summary: Move LogDirsCommand to tools Key: KAFKA-14594 URL: https://issues.apache.org/jira/browse/KAFKA-14594 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14547) Be able to run kafka KRaft Server in tests without needing to run a storage setup script
[ https://issues.apache.org/jira/browse/KAFKA-14547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655063#comment-17655063 ] Ismael Juma commented on KAFKA-14547: - Related KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-785%3A+Automatic+storage+formatting > Be able to run kafka KRaft Server in tests without needing to run a storage > setup script > > > Key: KAFKA-14547 > URL: https://issues.apache.org/jira/browse/KAFKA-14547 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.1 >Reporter: Natan Silnitsky >Priority: Major > > Currently kafka KRaft Server requires running kafka-storage.sh in order to > start properly. > This makes setup much more cubersome for build tools like bazel to work > properly. > One way to mitigate this is to configure the paths via kafkaConfig... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
gharris1727 commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1062726003 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: > if we want to surface producer errors to users ... which would involve changing the method signatures to accept callbacks This is not true; you can surface errors synchronously by throwing exceptions inside the `KafkaConfigBackingStore` and catching them in the caller. This is what I meant by terminating the producer callbacks: transform the control flow from asynchronous via a callback to synchronous results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
gharris1727 commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1062744806 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) { try { fencableProducer.beginTransaction(); -fencableProducer.send(new ProducerRecord<>(topic, key, value)); +fencableProducer.send(new ProducerRecord<>(topic, key, value), (metadata, exception) -> { Review Comment: > should already be the case with the current changes It may be the case but not for the reason you specified. The guarantee from the producer is that the record callbacks are completed before the commitTransaction _returns_, while I was suggesting a control flow where we ensure that record callbacks are completed before the commitTransaction is _called_. If you still call commitTransaction, you will get both the callback's error and the synchronous error from commitTransaction, and would still need to make sure that the correct exception is shadowed. The control over this shadowing is handled by this code and whoever is terminating the callback in the REST layer. > we could simply reword the existing exception message This is certainly the easy way out, but I don't think we should settle for that. Vague error messages indicate that they aren't capturing and reporting the exception properly, and making them more vague to compensate doesn't help anyone diagnose the underlying issue. If we can make a control flow change that reports the errors faithfully, that is going to help someone down the line debugging it. > the worker logs will reveal the underlying TopicAuthorizationException Is this the case? The worker is using _this_ code path to write the session key; If we're hiding the result from the REST calls, are we not also hiding the error from the herder tick thread? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
Atul Jain created KAFKA-14597: - Summary: [Streams] record-e2e-latency-max is not reporting correct metrics Key: KAFKA-14597 URL: https://issues.apache.org/jira/browse/KAFKA-14597 Project: Kafka Issue Type: Bug Reporter: Atul Jain Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}. However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time. Code describing my topology: static Topology buildTopology(String inputTopic, String outputTopic) \{ log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic); Serde stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) .peek((k,v) -> log.info("Observed event: key" + k + " value: " + v)) .mapValues(s -> { try { System.out.println("sleeping for 3 seconds"); Thread.sleep(3000); } catch (InterruptedException e) \{ e.printStackTrace(); } return s.toUpperCase(); }) .peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v)) .to(outputTopic, Produced.with(stringSerde, stringSerde)); return builder.build(); } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Atul Jain updated KAFKA-14597: -- Description: I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}. However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time. Code describing my topology: {code:java} static Topology buildTopology(String inputTopic, String outputTopic) { log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic); Serde stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) .peek((k,v) -> log.info("Observed event: key" + k + " value: " + v)) .mapValues(s -> { try { System.out.println("sleeping for 3 seconds"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return s.toUpperCase(); }) .peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v)) .to(outputTopic, Produced.with(stringSerde, stringSerde)); return builder.build(); } {code} was: I was following this KIP documentation ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) and kafka streams documentation ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) . Based on these documentations , the *record-e2e-latency-max* should monitor the full end to end latencies, which includes both *consumption latencies* and {*}processing delays{*}. However, based on my observations , record-e2e-latency-max seems to be only measuring the consumption latencies. processing delays can be measured using *process-latency-max* .I am checking all this using a simple topology consisting of source, processors and sink (code added). I have added some sleep time (of 3 seconds) in one of the processors to ensure some delays in the processing logic. These delays are not getting accounted in the record-e2e-latency-max but are accounted in process-latency-max. process-latency-max was observed to be 3002 ms which accounts for sleep time of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which does not account for 3 seconds of sleep time. Code describing my topology: static Topology buildTopology(String inputTopic, String outputTopic) \{ log.info("Input topic: " + inputTopic + " and output topic: " + outputTopic); Serde stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) .peek((k,v) -> log.info("Observed event: key" + k + " value: " + v)) .mapValues(s -> { try { System.out.println("sleeping for 3 seconds"); Thread.sleep(3000); } catch (InterruptedException e) \{ e.printStackTrace(); } return s.toUpperCase(); }) .peek((k,v) -> log.info("Transformed event: key" + k + " value: " + v)) .to(outputTopic, Produced.with(stringSerde, stringSerde)); return builder.build(); } > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 >
[jira] [Updated] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14597: Component/s: metrics streams > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Priority: Major > Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on pull request #13076: KAFKA-14279; Add 3.3.x to core compatibility tests
jsancio commented on PR #13076: URL: https://github.com/apache/kafka/pull/13076#issuecomment-1372586923 Thanks for the review @ijuma. Here are the results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672892122--jsancio--kafka-14279-3-3-sys-test--22b8a8e2c6/2023-01-04--001./2023-01-04--001./report.html None of the tests that I added failed. There are 13 system tests that mention 3.3.1 which matches what I expected. I am going to merge and cherry pick to the 3.4 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14545; Add 3.3.x streams system tests
jsancio commented on PR #13077: URL: https://github.com/apache/kafka/pull/13077#issuecomment-1372590378 Here are the system test results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672893494--jsancio--kafka-14545-stream-3-3-test--a595bcfb9c/2023-01-04--001./2023-01-04--001./report.html Looks like this test failed: ``` Module: kafkatest.tests.streams.streams_upgrade_test Class: StreamsUpgradeTest Method: test_rolling_upgrade_with_2_bounces Arguments: { "from_version": "3.3.1", "to_version": "3.5.0-SNAPSHOT" } ``` I am going to rerun just this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13049: KAFKA-14478: Move LogConfig/CleanerConfig and related to storage module
junrao commented on code in PR #13049: URL: https://github.com/apache/kafka/pull/13049#discussion_r1062797781 ## server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import static java.util.Arrays.asList; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.utils.Utils; + +public final class ServerTopicConfigSynonyms { +private static final String LOG_PREFIX = "log."; +private static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner."; + +/** + * Maps topic configurations to their equivalent broker configurations. + * + * Topics can be configured either by setting their dynamic topic configurations, or by + * setting equivalent broker configurations. For historical reasons, the equivalent broker + * configurations have different names. This table maps each topic configuration to its + * equivalent broker configurations. + * + * In some cases, the equivalent broker configurations must be transformed before they + * can be used. For example, log.roll.hours must be converted to milliseconds before it + * can be used as the value of segment.ms. + * + * The broker configurations will be used in the order specified here. In other words, if + * both the first and the second synonyms are configured, we will use only the value of + * the first synonym and ignore the second. + */ +// Topic configs with no mapping to a server config can be found in `LogConfig.CONFIGS_WITH_NO_SERVER_DEFAULTS` +@SuppressWarnings("deprecation") +public static final Map> ALL_TOPIC_CONFIG_SYNONYMS = Collections.unmodifiableMap(Utils.mkMap( +sameNameWithLogPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), +listWithLogPrefix(TopicConfig.SEGMENT_MS_CONFIG, +new ConfigSynonym("roll.ms"), +new ConfigSynonym("roll.hours", ConfigSynonym.HOURS_TO_MILLISECONDS)), +listWithLogPrefix(TopicConfig.SEGMENT_JITTER_MS_CONFIG, +new ConfigSynonym("roll.jitter.ms"), +new ConfigSynonym("roll.jitter.hours", ConfigSynonym.HOURS_TO_MILLISECONDS)), +singleWithLogPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, "index.size.max.bytes"), +singleWithLogPrefix(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, "flush.interval.messages"), +listWithLogPrefix(TopicConfig.FLUSH_MS_CONFIG, +new ConfigSynonym("flush.interval.ms"), +new ConfigSynonym("flush.scheduler.interval.ms")), +sameNameWithLogPrefix(TopicConfig.RETENTION_BYTES_CONFIG), +listWithLogPrefix(TopicConfig.RETENTION_MS_CONFIG, +new ConfigSynonym("retention.ms"), +new ConfigSynonym("retention.minutes", ConfigSynonym.MINUTES_TO_MILLISECONDS), +new ConfigSynonym("retention.hours", ConfigSynonym.HOURS_TO_MILLISECONDS)), +single(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "message.max.bytes"), +sameNameWithLogPrefix(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG), +sameNameWithLogCleanerPrefix(TopicConfig.DELETE_RETENTION_MS_CONFIG), +sameNameWithLogCleanerPrefix(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG), +sameNameWithLogCleanerPrefix(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG), +singleWithLogPrefix(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "segment.delete.delay.ms"), + singleWithLogCleanerPrefix(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "min.cleanable.ratio"), +sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), +sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG), +sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), +sameName(TopicConfig.COMPRESSION_TYPE_CONFIG), +sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG), +sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG), +
[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
philipnee commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1062809110 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; Review Comment: We should finalize the plan for subscriptionState, whether we will reuse the same class or implement a new one (and deprecate the old one) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vladimirdyuzhev opened a new pull request, #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used
vladimirdyuzhev opened a new pull request, #13081: URL: https://github.com/apache/kafka/pull/13081 When keytab file is not used, and the necessary configuration data are provided by the SASL callback handler, the Kerberos TGT renewal fails because the code is not re-using the configured CallbackHandler in the re-login sequence. The error is: ``` javax.security.auth.login.LoginException: No CallbackHandler available to garner authentication information from the user ``` The change preserves the instance of the CallbackHandler that was used to login into Kerberos and passes it to the LoginContext when TGT needs to be renewed. The change is tested in DIT with live Kafka and AD KRB instances in our current project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #13076: KAFKA-14279; Add 3.3.x to core compatibility tests
jsancio merged PR #13076: URL: https://github.com/apache/kafka/pull/13076 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13077: KAFKA-14279; Add 3.3.x streams system tests
mjsax commented on code in PR #13077: URL: https://github.com/apache/kafka/pull/13077#discussion_r1062838773 ## streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; + +import java.util.Properties; +import java.util.Random; + +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + + +public class StreamsUpgradeTest { + +@SuppressWarnings("unchecked") +public static void main(final String[] args) throws Exception { +if (args.length < 1) { +System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); +} +final String propFileName = args[0]; + +final Properties streamsProperties = Utils.loadProps(propFileName); + +System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.2)"); Review Comment: ```suggestion System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.3)"); ``` ## streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java: ## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; + +import java.time.Instant; + +public class SmokeTestUtil { + +final static int END = Integer.MAX_VALUE; + +static ProcessorSupplier printProcessorSupplier(final String topic) { +return printProcessorSupplier(topic, ""); +} + +static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { +return () -> new ContextualProcessor() { +private int numRecordsProcessed = 0; +private long smallestOffset = Long.MAX_VALUE; +private long largestOffset = Long.MIN_VALUE; + +@Override +public void init(final ProcessorContext context) { +super.init(context); +System.out.println("[3.2] initializing processor: topic=" + topic +
[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests
jsancio commented on PR #13077: URL: https://github.com/apache/kafka/pull/13077#issuecomment-1372724189 Same failure again: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1672950817--jsancio--kafka-14545-stream-3-3-test--a595bcfb9c/2023-01-05--001./2023-01-05--001./report.html `TimeoutError('Could not detect Kafka Streams version 3.5.0-SNAPSHOT on ubuntu@worker29')` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
jolshan commented on PR #13075: URL: https://github.com/apache/kafka/pull/13075#issuecomment-1372876700 So the error occurs when the fetch offset is not the futureLog's log end offset. Just curious -- is this because the log's highwatermark is lower or higher than the futureLog's? It seems that the value we put into the InitialFetchState is the first offset we fetch from, so just wanted to clarify the race and why this fixes the issue. (Not doubting you, just trying to understand 😄 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica
[ https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655153#comment-17655153 ] Jun Rao commented on KAFKA-9087: [~chia7712] : Thanks for the explanation. Great find! I agree that this is a bug and the fix that you suggested makes sense. In alterReplicaLogDirs(), we initialize the initial offset for ReplicaAlterLogDirsThread with futureLog.highWatermark. We should do the same thing when handling the LeaderAndIsrRequest. It seems that the bug was introduced in this PR [https://github.com/apache/kafka/pull/6841|https://github.com/apache/kafka/pull/6841.]. Do you plan to submit a PR? > ReplicaAlterLogDirs stuck and restart fails with > java.lang.IllegalStateException: Offset mismatch for the future replica > > > Key: KAFKA-9087 > URL: https://issues.apache.org/jira/browse/KAFKA-9087 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0 >Reporter: Gregory Koshelev >Priority: Major > > I've started multiple replica movements between log directories and some > partitions were stuck. After the restart of the broker I've got exception in > server.log: > {noformat} > [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to > (kafka.server.ReplicaAlterLogDirsThread) > org.apache.kafka.common.KafkaException: Error processing data for partition > metrics_timers-35 offset 4224887 > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: java.lang.IllegalStateException: Offset mismatch for the future > replica metrics_timers-35: fetched offset = 4224887, log end offset = 0. > at > kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311) > ... 16 more > [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped > (kafka.server.ReplicaAlterLogDirsThread) > {noformat} > Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix > the problem. To fix it I've stopped the broker and remove all the stuck > future partitions. > Detailed log below > {noformat} > [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Loading producer state till offset 4224887 with > message format version 2 (kafka.log.Log) > [2019-06-11 12:21:34,980] INFO [ProducerStateManager > partition=metrics_timers-35] Loading producer state from snapshot file > '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' > (kafka.log.ProducerStateManager) > [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Completed load of log with 1 segments, log start > offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica
[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica
[ https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655155#comment-17655155 ] Chia-Ping Tsai commented on KAFKA-9087: --- [~junrao] Please take a look at [https://github.com/apache/kafka/pull/13075] > ReplicaAlterLogDirs stuck and restart fails with > java.lang.IllegalStateException: Offset mismatch for the future replica > > > Key: KAFKA-9087 > URL: https://issues.apache.org/jira/browse/KAFKA-9087 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0 >Reporter: Gregory Koshelev >Priority: Major > > I've started multiple replica movements between log directories and some > partitions were stuck. After the restart of the broker I've got exception in > server.log: > {noformat} > [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to > (kafka.server.ReplicaAlterLogDirsThread) > org.apache.kafka.common.KafkaException: Error processing data for partition > metrics_timers-35 offset 4224887 > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131) > at scala.Option.foreach(Option.scala:274) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: java.lang.IllegalStateException: Offset mismatch for the future > replica metrics_timers-35: fetched offset = 4224887, log end offset = 0. > at > kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311) > ... 16 more > [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped > (kafka.server.ReplicaAlterLogDirsThread) > {noformat} > Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix > the problem. To fix it I've stopped the broker and remove all the stuck > future partitions. > Detailed log below > {noformat} > [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Loading producer state till offset 4224887 with > message format version 2 (kafka.log.Log) > [2019-06-11 12:21:34,980] INFO [ProducerStateManager > partition=metrics_timers-35] Loading producer state from snapshot file > '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' > (kafka.log.ProducerStateManager) > [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Completed load of log with 1 segments, log start > offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 0 (kafka.cluster.Replica) > [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 > with initial high watermark 4224887 (kafka.cluster.Replica) > [2019-06-11 12:21:47,090] INFO [Log partition=metrics_timers-35, > dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest > offset in the log is 4224886 (kafka.log.Log) > [2019-06-11 12:30:04,757] INFO [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Re
[GitHub] [kafka] vladimirdyuzhev commented on pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used
vladimirdyuzhev commented on PR #13081: URL: https://github.com/apache/kafka/pull/13081#issuecomment-1372892329 Some tests are failed, but apparently it has nothing to do with Kerberos: ``` testSendNonCompressedMessageWithCreateTime(String) > kafka.api.PlaintextProducerSendTest.testSendNonCompressedMessageWithCreateTime(String)[1] FAILED [2023-01-05T21:14:03.931Z] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
chia7712 commented on PR #13075: URL: https://github.com/apache/kafka/pull/13075#issuecomment-1372912104 > is this because the log's highwatermark is lower or higher than the futureLog's? higher, and thanks to @junrao for the great explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] littlehorse-eng opened a new pull request, #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
littlehorse-eng opened a new pull request, #13082: URL: https://github.com/apache/kafka/pull/13082 Documentation only—Minor clarification on how max.warmup.replicas works; specifically, that one "warmup replica" corresponds to a Task that is restoring its state. Also clarifies how max.warmup.replicas interacts with probing.rebalance.interval.ms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.
littlehorse-eng commented on code in PR #13082: URL: https://github.com/apache/kafka/pull/13082#discussion_r1062979323 ## docs/streams/developer-guide/config-streams.html: ## @@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas - The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping - the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker - traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time - for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + +The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping +the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker +traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time +for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1. + + +Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active Task during Review Comment: @ableegoldman could you please confirm this? @mjsax any chance you could also let me know how to preview the changes to the HTML? I tried opening the file in my browser it wouldn't render...this is somewhat embarrassingly my first time editing HTML. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
chia7712 commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063007860 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = { Review Comment: @ijuma please take a look at this UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.
junrao commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1063005106 ## core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala: ## @@ -243,282 +245,287 @@ class LeaderEpochFileCacheTest { //Given val cache = new LeaderEpochFileCache(tp, checkpoint) -cache.assign(epoch = 2, startOffset = 6) +cache.assign(2, 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) val cache2 = new LeaderEpochFileCache(tp, checkpoint2) //Then assertEquals(1, cache2.epochEntries.size) -assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0)) +assertEquals(new EpochEntry(2, 6), cache2.epochEntries.get(0)) } @Test def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = { //Given -cache.assign(epoch = 1, startOffset = 5); +cache.assign(1, 5); var logEndOffset = 6 -cache.assign(epoch = 2, startOffset = 6); +cache.assign(2, 6); logEndOffset = 7 //When we update an epoch in the past with a different offset, the log has already reached //an inconsistent state. Our options are either to raise an error, ignore the new append, //or truncate the cached epochs to the point of conflict. We take this latter approach in //order to guarantee that epochs and offsets in the cache increase monotonically, which makes //the search logic simpler to reason about. -cache.assign(epoch = 1, startOffset = 7); +cache.assign(1, 7); logEndOffset = 8 //Then later epochs will be removed -assertEquals(Some(1), cache.latestEpoch) +assertEquals(Optional.of(1), cache.latestEpoch) //Then end offset for epoch 1 will have changed -assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset)) +assertEquals((1, 8), toTuple(cache.endOffsetFor(1, logEndOffset))) //Then end offset for epoch 2 is now undefined -assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2, logEndOffset)) -assertEquals(EpochEntry(1, 7), cache.epochEntries(0)) +assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(2, logEndOffset))) +assertEquals(new EpochEntry(1, 7), cache.epochEntries.get(0)) + } + + def toTuple[K, V](entry: java.util.Map.Entry[K, V]): (K, V) = { Review Comment: Could this be private? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1294,36 +1293,53 @@ class UnifiedLog(@volatile var logStartOffset: Long, // The first cached epoch usually corresponds to the log start offset, but we have to verify this since // it may not be true following a message format version bump as the epoch will not be available for // log entries written in the older format. -val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry) -val epochOpt = earliestEpochEntry match { - case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch) - case _ => Optional.empty[Integer]() +val earliestEpochEntry = leaderEpochCache match { + case Some(cache) => cache.earliestEntry() + case None => Optional.empty[EpochEntry]() } + +val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) { + Optional.of[Integer](earliestEpochEntry.get().epoch) +} else Optional.empty[Integer]() + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() -val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache => - cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry)) -val epochOpt = earliestLocalLogEpochEntry match { - case Some(entry) if entry.startOffset <= curLocalLogStartOffset => Optional.of[Integer](entry.epoch) - case _ => Optional.empty[Integer]() + +val earliestLocalLogEpochEntry: Optional[EpochEntry] = leaderEpochCache match { + case Some(cache) => +val value = cache.epochForOffset(curLocalLogStartOffset) +if (value.isPresent) cache.epochEntry(value.get) else Optional.empty[EpochEntry]() + case None => Optional.empty[EpochEntry]() } + +val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) + Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) +else Optional.empty[Integer]() + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { -val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).m
[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
jolshan commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063012777 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = { Review Comment: You mentioned that the v2 format wasn't as easy to replicate. Is it too hard to have a test for? Or are the cases similar enough for v1 to cover both and v1 is less flaky? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
jolshan commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063013207 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.message.format.version", "0.10.2") +props.put("inter.broker.protocol.version", "2.8") +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) +val partition = rm.createPartition(new TopicPartition(topic, 0)) +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + +rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() +.setTopicName(topic) +.setPartitionIndex(0) +.setControllerEpoch(0) +.setLeader(0) +.setLeaderEpoch(0) +.setIsr(Seq[Integer](0).asJava) +.setPartitionEpoch(0) +.setReplicas(Seq[Integer](0).asJava) +.setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) +appendRecords(rm, new TopicPartition(topic, 0), + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes( +logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) + +partition.createLogIfNotExists(isNew = true, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + +// this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error +rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) +assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size) +// wait for the log dir fetcher thread +TimeUnit.SECONDS.sleep(3) Review Comment: Is there any way we could mock this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
jolshan commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063013207 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = { +val dir1 = TestUtils.tempDir() +val dir2 = TestUtils.tempDir() +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +props.put("log.message.format.version", "0.10.2") +props.put("inter.broker.protocol.version", "2.8") +props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) +val config = KafkaConfig.fromProps(props) +val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) +val metadataCache: MetadataCache = mock(classOf[MetadataCache]) +mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) +val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) +val partition = rm.createPartition(new TopicPartition(topic, 0)) +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + +rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() +.setTopicName(topic) +.setPartitionIndex(0) +.setControllerEpoch(0) +.setLeader(0) +.setLeaderEpoch(0) +.setIsr(Seq[Integer](0).asJava) +.setPartitionEpoch(0) +.setReplicas(Seq[Integer](0).asJava) +.setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) +appendRecords(rm, new TopicPartition(topic, 0), + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes( +logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) + +partition.createLogIfNotExists(isNew = true, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + +// this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error +rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) +assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size) +// wait for the log dir fetcher thread +TimeUnit.SECONDS.sleep(3) Review Comment: Is there any way we could mock this and/or confirm the log dir fetcher thread is started? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Fix Version/s: 3.5.0 > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and check exception on > the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing when unimplemented. > Additionally, the Kafka Consumer/Producer Interceptor containers will > implement a corresponding maybeOpen method which throws a checked exception. > In order to maintain backwards compatibility with earlier developed > interceptors the maybeOpen will check whether the interceptor's interface > contains the newer open method before calling it accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14565) Improving Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard reassigned KAFKA-14565: --- Assignee: Terry Beard > Improving Interceptor Resource Leakage Prevention > - > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and check exception on > the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing when unimplemented. > Additionally, the Kafka Consumer/Producer Interceptor containers will > implement a corresponding maybeOpen method which throws a checked exception. > In order to maintain backwards compatibility with earlier developed > interceptors the maybeOpen will check whether the interceptor's interface > contains the newer open method before calling it accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman closed pull request #7824: MINOR: flush only the evicted dirty entry
ableegoldman closed pull request #7824: MINOR: flush only the evicted dirty entry URL: https://github.com/apache/kafka/pull/7824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13083: MINOR: bump year in NOTICE
ableegoldman opened a new pull request, #13083: URL: https://github.com/apache/kafka/pull/13083 It's 2023 now y'all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13083: MINOR: bump year in NOTICE
ableegoldman merged PR #13083: URL: https://github.com/apache/kafka/pull/13083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14465) java.lang.NumberFormatException: For input string: "index"
[ https://issues.apache.org/jira/browse/KAFKA-14465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655220#comment-17655220 ] jianbin.chen commented on KAFKA-14465: -- Can you help me with this question? > java.lang.NumberFormatException: For input string: "index" > --- > > Key: KAFKA-14465 > URL: https://issues.apache.org/jira/browse/KAFKA-14465 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: jianbin.chen >Priority: Major > Attachments: image-2022-12-15-12-26-24-718.png > > > {code:java} > [2022-12-13 07:12:20,369] WARN [Log partition=fp_sg_flow_copy-1, > dir=/home/admin/output/kafka-logs] Found a corrupted index file corresponding > to log file /home/admin/output/kafk > a-logs/fp_sg_flow_copy-1/0165.log due to Corrupt index found, > index file > (/home/admin/output/kafka-logs/fp_sg_flow_copy-1/0165.index) > has non-zero > size but the last offset is 165 which is no greater than the base offset > 165.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2022-12-13 07:12:20,369] ERROR There was an error in one of the threads > during logs loading: java.lang.NumberFormatException: For input string: > "index" (kafka.log.LogManager) > [2022-12-13 07:12:20,374] INFO [ProducerStateManager > partition=fp_sg_flow_copy-1] Writing producer snapshot at offset 165 > (kafka.log.ProducerStateManager) > [2022-12-13 07:12:20,378] INFO [Log partition=fp_sg_flow_copy-1, > dir=/home/admin/output/kafka-logs] Loading producer state from offset 165 > with message format version 2 (kafka.lo > g.Log) > [2022-12-13 07:12:20,381] INFO [Log partition=fp_sg_flow_copy-1, > dir=/home/admin/output/kafka-logs] Completed load of log with 1 segments, log > start offset 165 and log end offset > 165 in 13 ms (kafka.log.Log) > [2022-12-13 07:12:20,389] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > java.lang.NumberFormatException: For input string: "index" > at > java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) > at java.lang.Long.parseLong(Long.java:589) > at java.lang.Long.parseLong(Long.java:631) > at scala.collection.immutable.StringLike.toLong(StringLike.scala:306) > at scala.collection.immutable.StringLike.toLong$(StringLike.scala:306) > at scala.collection.immutable.StringOps.toLong(StringOps.scala:29) > at kafka.log.Log$.offsetFromFile(Log.scala:1846) > at kafka.log.Log.$anonfun$loadSegmentFiles$3(Log.scala:331) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at kafka.log.Log.loadSegmentFiles(Log.scala:320) > at kafka.log.Log.loadSegments(Log.scala:403) > at kafka.log.Log.(Log.scala:216) > at kafka.log.Log$.apply(Log.scala:1748) > at kafka.log.LogManager.loadLog(LogManager.scala:265) > at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:335) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > [2022-12-13 07:12:20,401] INFO [KafkaServer id=1] shutting down > (kafka.server.KafkaServer) > {code} > When I restart the broker, it becomes like this, I deleted the > 000165.index file, after starting it, there are still other > files with the same error, please tell me how to fix it and what it is causing > > I tried to open the file 0165.index but it failed > !image-2022-12-15-12-26-24-718.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13075: KAFKA-9087 Replace log high watermark by future log high watermark wh…
ijuma commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063058965 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersForV1MessageFormat(): Unit = { Review Comment: V1 is deprecated and we intend to remove it (along with V0). We should not be writing new tests with that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13083: MINOR: bump year in NOTICE
showuon commented on PR #13083: URL: https://github.com/apache/kafka/pull/13083#issuecomment-1373098778 Happy new year! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837 ## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { +public final long producerId; +public final LogOffsetMetadata firstOffset; +public OptionalLong lastOffset; + +public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { +this.producerId = producerId; +this.firstOffset = firstOffset; +this.lastOffset = lastOffset; +} +public TxnMetadata(long producerId, long firstOffset) { +this(producerId, new LogOffsetMetadata(firstOffset)); +} + +public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { +this(producerId, firstOffset, OptionalLong.empty()); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +TxnMetadata that = (TxnMetadata) o; + +if (producerId != that.producerId) return false; +if (!Objects.equals(firstOffset, that.firstOffset)) return false; +return Objects.equals(lastOffset, that.lastOffset); Review Comment: I avoid non final fields usage in equals/hashcode implementations. I should have left a comment in the code on why these methods were added. I tried to keep similar semantics like case classes in Scala as earlier. Afaik, it does all the fields eq(reference checks) as the default implementation. TxnMetadata is used as a value of TreeMap in ProducerStateManager. Currently, I do not see any usages in map#containsValue in that. So, I prefer to avoid implementing it for now as you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063067362 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -354,9 +356,10 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() -stateManager.loadProducerEntry(new ProducerStateEntry(pid2, - mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, - 0, RecordBatch.NO_TIMESTAMP, Some(75L))) +val batchMetadata = new util.ArrayList[BatchMetadata]() Review Comment: ArrayList constructor does not allow passing a single instance like mutable.Queue. Anyways this is changed to Collections.singletonList in the latest code. I saw your suggestion to remove this List and take single instance directly. I will reply to your comment there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063067653 ## storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * The last written record for a given producer. The last data offset may be undefined + * if the only log entry for a producer is a transaction marker. + */ +public final class LastRecord { +public final OptionalLong lastDataOffset; +public final short producerEpoch; + +public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { +this.lastDataOffset = lastDataOffset; +this.producerEpoch = producerEpoch; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +LastRecord that = (LastRecord) o; + +if (producerEpoch != that.producerEpoch) return false; +return Objects.equals(lastDataOffset, that.lastDataOffset); +} + +@Override +public int hashCode() { +int result = lastDataOffset != null ? lastDataOffset.hashCode() : 0; +result = 31 * result + (int) producerEpoch; Review Comment: Good catch! It is generated by intellij. I do not think this is really needed as you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063068193 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { +public static final int NUM_BATCHES_TO_RETAIN = 5; +public final long producerId; +private final List batchMetadata; +private short producerEpoch; +public int coordinatorEpoch; +public long lastTimestamp; +public OptionalLong currentTxnFirstOffset; + +public ProducerStateEntry(long producerId) { +this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); +} + +public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { +this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); +} + +public ProducerStateEntry(long producerId, List batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { +this.producerId = producerId; +this.batchMetadata = batchMetadata; +this.producerEpoch = producerEpoch; +this.coordinatorEpoch = coordinatorEpoch; +this.lastTimestamp = lastTimestamp; +this.currentTxnFirstOffset = currentTxnFirstOffset; +} + +public int firstSeq() { +return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); +} + + +public long firstDataOffset() { +return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); +} + +public int lastSeq() { +return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; +} + +public long lastDataOffset() { +return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; +} + +public int lastOffsetDelta() { +return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; +} + +public boolean isEmpty() { +return batchMetadata.isEmpty(); +} + +public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) { +maybeUpdateProducerEpoch(producerEpoch); +addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)); +this.lastTimestamp = timestamp; +} + +public boolean maybeUpdateProducerEpoch(short producerEpoch) { +if (this.producerEpoch != producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +private void addBatchMetadata(BatchMetadata batch) { +if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.remove(0); +batchMetadata.add(batch); +} + +public void update(ProducerStateEntry nextEntry) { +maybeUpdateProducerEpoch(nextEntry.producerEpoch); +while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.remove(0)); +this.coordinatorEpoch = nextEntry.coordinatorEpoch; +this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset; +this.lastTimestamp = nextEntry.lastTimestamp; +} + +public Optional findDuplicateBatch(RecordBatch batc
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063070497 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { +public static final int NUM_BATCHES_TO_RETAIN = 5; +public final long producerId; +private final List batchMetadata; +private short producerEpoch; +public int coordinatorEpoch; +public long lastTimestamp; +public OptionalLong currentTxnFirstOffset; + +public ProducerStateEntry(long producerId) { +this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); +} Review Comment: It was part of `ProducerStateEntry#empty()` method. Avoided creating a static method and introduced a constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063071578 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the la
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063072191 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + Review Comment: Good catch! These were left wh
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837 ## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { +public final long producerId; +public final LogOffsetMetadata firstOffset; +public OptionalLong lastOffset; + +public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { +this.producerId = producerId; +this.firstOffset = firstOffset; +this.lastOffset = lastOffset; +} +public TxnMetadata(long producerId, long firstOffset) { +this(producerId, new LogOffsetMetadata(firstOffset)); +} + +public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { +this(producerId, firstOffset, OptionalLong.empty()); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +TxnMetadata that = (TxnMetadata) o; + +if (producerId != that.producerId) return false; +if (!Objects.equals(firstOffset, that.firstOffset)) return false; +return Objects.equals(lastOffset, that.lastOffset); Review Comment: I avoid non final fields usage in equals/hashcode implementations. I should have left a comment in the code on why these methods were added. I tried to keep similar semantics like case classes in Scala as earlier. Afaik, it does all the fields eq(reference checks) as the default implementation. `TxnMetadata` is used as a value of `TreeMap` in ProducerStateManager. Currently, I do not see any usages in map#containsValue in that. So, I prefer to avoid implementing it for now as you suggested. But the existing tests have equality assertions with `TxnMetadata`, one way to address them is to have the equality assertions for producer id and offsetmetadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063147960 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { +private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); +private final TopicPartition topicPartition; +public final long producerId; +private final ProducerStateEntry currentEntry; +private final AppendOrigin origin; + +private final List transactions = new ArrayList<>(); +private final ProducerStateEntry updatedEntry; + +/** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ +public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { +this.topicPartition = topicPartition; +this.producerId = producerId; +this.currentEntry = currentEntry; +this.origin = origin; + +updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), +currentEntry.coordinatorEpoch, +currentEntry.lastTimestamp, +currentEntry.currentTxnFirstOffset); +} + +private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { +checkProducerEpoch(producerEpoch, offset); +if (origin == AppendOrigin.CLIENT) { +checkSequence(producerEpoch, firstSeq, offset); +} +} + +private void checkProducerEpoch(short producerEpoch, long offset) { +if (producerEpoch < updatedEntry.producerEpoch()) { +String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + +"which is smaller than the last
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837 ## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { +public final long producerId; +public final LogOffsetMetadata firstOffset; +public OptionalLong lastOffset; + +public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { +this.producerId = producerId; +this.firstOffset = firstOffset; +this.lastOffset = lastOffset; +} +public TxnMetadata(long producerId, long firstOffset) { +this(producerId, new LogOffsetMetadata(firstOffset)); +} + +public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { +this(producerId, firstOffset, OptionalLong.empty()); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +TxnMetadata that = (TxnMetadata) o; + +if (producerId != that.producerId) return false; +if (!Objects.equals(firstOffset, that.firstOffset)) return false; +return Objects.equals(lastOffset, that.lastOffset); Review Comment: I try to avoid non final fields usage in equals/hashcode implementations. I should have left a comment in the code on why these methods were added. I tried to keep similar semantics like case classes in Scala as earlier. Afaik, it does all the fields eq(reference checks) as the default implementation. `TxnMetadata` is used as a value of `TreeMap` structure in ProducerStateManager. Currently, I do not see any usages in map#containsValue in that. So, I prefer to avoid implementing it for now as you suggested. But the existing tests have equality assertions with `TxnMetadata`, one way to address them is to have the equality assertions for producer id and LogOffsetMetadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13043: KAFKA-14558: Move LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module
satishd commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1063066837 ## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { +public final long producerId; +public final LogOffsetMetadata firstOffset; +public OptionalLong lastOffset; + +public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { +this.producerId = producerId; +this.firstOffset = firstOffset; +this.lastOffset = lastOffset; +} +public TxnMetadata(long producerId, long firstOffset) { +this(producerId, new LogOffsetMetadata(firstOffset)); +} + +public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { +this(producerId, firstOffset, OptionalLong.empty()); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +TxnMetadata that = (TxnMetadata) o; + +if (producerId != that.producerId) return false; +if (!Objects.equals(firstOffset, that.firstOffset)) return false; +return Objects.equals(lastOffset, that.lastOffset); Review Comment: I try to avoid mutable fields usage in equals/hashcode implementations. But I should have left a comment in the code on why these methods were added. I tried to keep similar semantics like case classes in Scala as earlier. Afaik, it does all the fields eq(reference checks) as the default implementation. `TxnMetadata` is used as a value of `TreeMap` structure in ProducerStateManager. Currently, I do not see any usages in map#containsValue in that. So, I prefer to avoid implementing it for now as you suggested. But the existing tests have equality assertions with `TxnMetadata`, one way to address them is to have the equality assertions for producer id and LogOffsetMetadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org