Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2717

2024-03-12 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-939: Support Participation in 2PC

2024-03-12 Thread Justine Olshan
Hey Artem,
I took another look at the KIP and the discussion on the mailing list.

+1 (binding) from me :)

Justine

On Wed, Mar 6, 2024 at 10:23 AM Martijn Visser 
wrote:

> Hi all,
>
> It is so exiting to see this KIP and know that it will greatly benefit
> Flink and other technologies.
>
>  +1
>
> Best regards,
>
> Martijn
>
> Op vr 1 dec 2023 om 11:07 schreef Artem Livshits
> 
>
> > Hello,
> >
> > This is a voting thread for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > .
> >
> > The KIP proposes extending Kafka transaction support (that already uses
> 2PC
> > under the hood) to enable atomicity of dual writes to Kafka and an
> external
> > database, and helps to fix a long standing Flink issue.
> >
> > An example of code that uses the dual write recipe with JDBC and should
> > work for most SQL databases is here
> > https://github.com/apache/kafka/pull/14231.
> >
> > The FLIP for the sister fix in Flink is here
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> >
> > -Artem
> >
>


[jira] [Resolved] (KAFKA-9690) MemoryLeak in JMX Reporter

2024-03-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-9690.
---
Resolution: Duplicate

close this due to https://issues.apache.org/jira/browse/KAFKA-9306

> MemoryLeak in JMX Reporter
> --
>
> Key: KAFKA-9690
> URL: https://issues.apache.org/jira/browse/KAFKA-9690
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.0
>Reporter: Kaare Nilsen
>Priority: Major
> Attachments: image-2020-03-10-12-37-49-259.png, 
> image-2020-03-10-12-44-11-688.png
>
>
> We use kafka in a streamin http application creating a new consumer for each 
> incoming requests. In version 2.4.0 we experience that the memory builds up 
> for each new consumer. After debugging the issue after a memory dump revealed 
> it was in the JMX subsystem we found that one of the JMX beans 
> (kafka.consumer) build up one metric consumer-metrics without releasing them 
> on closing the consumer.
> What we found is that the metricRemoval  
> {code:java}
> public void metricRemoval(KafkaMetric metric) {
> synchronized (LOCK) {
> MetricName metricName = metric.metricName();
> String mBeanName = getMBeanName(prefix, metricName);
> KafkaMbean mbean = removeAttribute(metric, mBeanName);
> if (mbean != null) {
> if (mbean.metrics.isEmpty()) {
> unregister(mbean);
> mbeans.remove(mBeanName);
> } else
> reregister(mbean);
> }
> }
> }
> {code}
> The check mbean.metrics.isEmpty() for this particular metric never yielded 
> true so the mbean was never removed. Thus building up the mbeans HashMap.
> The metrics that is not released are:
> {code:java}
> last-poll-seconds-ago
> poll-idle-ratio-avg")
> time-between-poll-avg
> time-between-poll-max
> {code}
> I have a workaround in my code now by having a modified JMXReporter in my pwn 
> project with the following close method
> {code:java}
> public void close() {
> synchronized (LOCK) {
> for (KafkaMbean mbean : this.mbeans.values()) {
> mbean.removeAttribute("last-poll-seconds-ago");
> mbean.removeAttribute("poll-idle-ratio-avg");
> mbean.removeAttribute("time-between-poll-avg");
> mbean.removeAttribute("time-between-poll-max");
> unregister(mbean);
> }
> }
> }
> {code}
> This will remove the attributes that are not cleaned up and prevent the 
> memory leakage, but I have not found the root casue.
> Another workaround is to use kafka client 2.3.1
>  
> this is how it looks in the jmx console after a couple of clients have 
> connected and disconnected. Here you can see that the one metric builds up 
> and the old ones have the four attributes that makes the unregister fail.
>  
> !image-2020-03-10-12-37-49-259.png!
>  
> dThis Is how it looks after a while in kafka client 2.3.1
> !image-2020-03-10-12-44-11-688.png!
> As you can see no leakage here.
> I suspect this pull request to be the one that have introduced the leak: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior]
> https://issues.apache.org/jira/browse/KAFKA-8874



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2024-03-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-9504.
---
Resolution: Duplicate

close this due to https://issues.apache.org/jira/browse/KAFKA-9306

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2716

2024-03-12 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16367) Full ConsumerGroupHeartbeat response must be sent when full request is received

2024-03-12 Thread David Jacot (Jira)
David Jacot created KAFKA-16367:
---

 Summary: Full ConsumerGroupHeartbeat response must be sent when 
full request is received
 Key: KAFKA-16367
 URL: https://issues.apache.org/jira/browse/KAFKA-16367
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-12 Thread Lucas Brutschy
@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function,
ConsumerRecord>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like  => 

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax  wrote:
>
> @Bruno:
>
> (1), I think you are spot for the ts-extractor: on the restore code
> path, we only support record-ts, but there is no need for a custom-ts
> because for regular changelog topics KS sets the ts, and thus, the
> optimization this KIP proposes required that the global topic follow the
> changelog format, ie, the ts must be in the record-ts.
>
> However, for the regular processing path, I am not sure if we can omit
> deserializers. The way the PAPI is wired up, seems to require that we
> give proper types to _other_ Processor that read from the global state
> store. For this reason, the store (which takes `Serdes` with proper
> types) is wrapped with a `MeteredStore` (like all others) to do the
> Serde work, and this MeteredStore is also exposed to the
> global-Processor? Might be good for Walker to dig into this to find out
> the details?
>
> If would of course be nice if we could avoid the unnecessary
> deserialization on topic read, and re-serialization on global-store put
> for this case, but it seems not to be straightforward to do...
>
>
> (2). Is this about the PAPI/Topology? For this case, we don't have any
> config object across the board. We only do this in the DSL. Hence, I
> would propose to just follow the existing pattern in this KIP to keep
> the API consistent. For the DSL, it could make sense of course. -- Of
> course, if we think the PAPI could be improved with config objects, we
> could do this in a dedicate KIP.
>
>
> @Lucas:
>
> The PAPI is unfortunately (by design) much more open and less
> restrictive. If a users has a custom state store, we need some
> `Processor` code from them, because we cannot provide a built-in
> processor for an unknown store. The overload which won't take a
> processor would only work for the built-in key-value store, what I
> assume would cover most use-cases, however, we should keep the door open
> for other use cases. Otherwise, we disallow this optimization for custom
> stores. PAPI is really about flexibility, and yes, with great power
> comes great responsibility for the users :)
>
> But this actually highlights a different aspect: the overload not
> accepting a custom `Processor` but using a built-in processor, should
> not accept a generic `StoreBuilder` but should restrict the type to
> `StoreBuilder`?
>
>
> -Matthias
>
>
>
> On 3/6/24 1:14 PM, Lucas Brutschy wrote:
> > Hey Walker
> >
> > Thanks for the KIP, and congrats on the KiBiKIP ;)
> >
> > My main point is that I'd vote against introducing
> > `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
> > just incorrect and should be removed or deprecated. If we think we
> > need to keep the old behavior around, renaming the methods, e.g., to
> > `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
> > behavior. But at a first glance, the old behavior just looks like a
> > bug to me and should just be removed.
> >
> > So for this KIP, I'd keep two variants as you propose and drop the
> > boolean parameter, but the two variants will be
> >   1) a copy-restore variant without custom processing, as you propose.
> >   2) a process-restore variant with custom processing (parameters the
> > same as before). This should be combined with a clear warning in the
> > Javadoc of the performance downside of this approach.
> >
> > Presentation:
> > 1) I wonder if you could make another pass on the motivation section.
> > I was lacking some context on this problem, and I think the nature of
> > the restore issue only became clear to me when I read through the
> > comments in the JIRA ticket you linked.
> > 2) If we decide to keep the parameter `reprocessOnRestore`, the
> > Javadoc on it should be extended. This is a somewhat subtle issue, and
> > I don't think `restore by reprocessing` is enough of an explanation.
> >
> > Nits:
> >
> > `{@link ValueTransformer ValueTransformer}` -> `{@link
> > ValueTransformer ValueTransformers}`
> > `user defined` -> `user-defined`
> >
> > Cheers,
> > Lucas
> >
> > On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:
> >>
> >> Hi Walker,
> >>
> >> Thanks for the KIP!
> >>
> >> Great that you are going to fix this long-standing issue!
> >>
> >> 1.
> >> I was wondering if we need the timestamp extractor as well as the key
> >> and value deserializer in Topology#addGlobalStore() that do not take a
> >> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
> >> Since those methods