Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Thanks Philip, That sounds pretty good. Meanwhile I'll continue to study KIP-848. It is a bit too much to digest in 1 go. Do you have a rough timeline for when the new consumer implementation can be tried out in non-production environments? Kind regards, Erik. Op 14-10-2023 om 20:48 schreef Philip Nee: Hi Erik, Thanks for the KIP, again. I am also very much interested in the idea of this KIP, and I want to let you know that we are rewriting the kafka consumer using an event-driven approach, so I think the new impl would make this KIP much easier to implement. In a nutshell, the network IO will become completely asynchronous to the application thread, so that the blocking APIs won't stale the network send/receive. In the new impl, the main role of poll are 1. check if there are any background events such as error or callback invocation, 2. notify the background that the user is polling, and 3. check if there is any data to return to the user. Because the background thread and application thread are inherently working in an async fashion, it is possible to continue to process and commit during the revocation period; however, we have to be very careful with the state of partition ownership as you mentioned in the KIP. Please keep an eye out on the new consumer implementation, in case if you are interested in digging in, it is the PrototypeKafkaConsumer module. It is not fully functional but we are working full speed to get this to a good state. Thanks - I am still reading to KIP and your previous KIP to see if I can make more constructive suggestions here. P On Fri, Oct 13, 2023 at 11:54 PM Erik van Oosten wrote: Hello David, Thanks, I am happy to hear we agree on the problem. All the tiny details of an implementation are less important. I will read KIP-848 first to answer you question about its relation with KIP-983. But for sure it makes sense to complete the implementation of KIP-848 first. Kind regards, Erik. Op 13-10-2023 om 21:00 schreef David Jacot: Hi Erik, Thanks for the KIP. I haven’t fully read the KIP yet but I agree with the weaknesses that you point out in it. I will continue to read it. For your information, we are working full speed on implementing KIP-848 while also changing the internal threading model of consumer. Those changes are already extremely large so I would rather prefer to complete them before adding more on top of them. Moreover, I think that this KIP should build on top of KIP-848 now. Would you agree with this? Best, David Le ven. 13 oct. 2023 à 20:44, Erik van Oosten .invalid> a écrit : Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooper
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hello David, Thanks, I am happy to hear we agree on the problem. All the tiny details of an implementation are less important. I will read KIP-848 first to answer you question about its relation with KIP-983. But for sure it makes sense to complete the implementation of KIP-848 first. Kind regards, Erik. Op 13-10-2023 om 21:00 schreef David Jacot: Hi Erik, Thanks for the KIP. I haven’t fully read the KIP yet but I agree with the weaknesses that you point out in it. I will continue to read it. For your information, we are working full speed on implementing KIP-848 while also changing the internal threading model of consumer. Those changes are already extremely large so I would rather prefer to complete them before adding more on top of them. Moreover, I think that this KIP should build on top of KIP-848 now. Would you agree with this? Best, David Le ven. 13 oct. 2023 à 20:44, Erik van Oosten a écrit : Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Thanks Philip, No worries, I am not in a hurry. Knowing this is not forgotten is enough for me. If there is anything I can do to help the process please let me know. Kind regards, Erik. Op 13-10-2023 om 20:29 schreef Philip Nee: Hi Erik, Sorry for the delay, I have not finished reviewing the KIP, but I also have not forgotten about it! In general, KIP review process can be lengthy, so I think mailing list is the best bet to get the committer's attention. P On Fri, Oct 13, 2023 at 10:55 AM Erik van Oosten wrote: Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hi client developers, The text is updated so that it is more clear that you can only use auto-commit when doing synchronous processing (approach 1). I am assuming that auto-commit commits whatever was consumed in the previous poll. I am wondering why this KIP doesn't get more attention. Is async processing not something that the kafka client wants to support? Kind regards, Erik. Op 25-09-2023 om 18:17 schreef Erik van Oosten: Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [DISCUSS] KIP-983: Full speed async processing during rebalance
Hi Viktor, Good questions! 1. Auto-commits would only work with approach 1 in the KIP. Any async solution is incompatible with auto-commits. Do you think the text will improve when this is mentioned? 2. That is entirely correct. If you use async commits you can await completion by doing a single sync commit with an empty offsets Map (this will work as of Kafka 3.6.0). Is there anything I can do to make the text clearer? Kind regards, Erik. Op 25-09-2023 om 17:04 schreef Viktor Somogyi-Vass: Hi Erik, I'm still trying to wrap my head around the KIP, however I have a few questions that weren't clear to me regarding offset commits: 1. Would auto-commits interfere with the behavior defined in your KIP or would it work the same as manual commits? 2. As I see you don't separate offset commits by whether they're sync or async. For sync commits timing isn't really a problem but how would you change work in case of async offset commits? There can be a few caveats there as you may not know whether a commit is finished or not until your callback is called. Thanks, Viktor On Sat, Sep 23, 2023 at 4:00 PM Erik van Oosten wrote: Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[DISCUSS] KIP-983: Full speed async processing during rebalance
Hi all, I would like to start the discussion on KIP-983: Full speed async processing during rebalance [1]. The idea is that we can prevent the drop in throughput during a cooperative rebalance. I am curious to your ideas and comments. Kind regards, Erik. [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-983%3A+Full+speed+async+processing+during+rebalance -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [slf4j-user] Release of SLF4J version 2.0.9
Hi Ceki, Since startup time is a pretty important for services running in a cloud environment, can you say more about roughly how much time could be won by this feature? Just an order of magnitude would be great. Kind regards, Erik. Op 03-09-2023 om 18:49 schreef Ceki Gülcü via slf4j-user: Moreover, the "slf4j.provider" system property bypasses the service loader mechanism for finding providers and may shorten SLF4J initialization. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com ___ slf4j-user mailing list slf4j-user@qos.ch https://mailman.qos.ch/cgi-bin/mailman/listinfo/slf4j-user
[jira] [Resolved] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved KAFKA-14972. - Resolution: Won't Fix > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved KAFKA-14972. - Resolution: Won't Fix > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751330#comment-17751330 ] Erik van Oosten commented on KAFKA-14972: - I am closing this task as won't fix as the committers do not seem to be convinced it is needed to support async run times. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten reassigned KAFKA-14972: --- Assignee: (was: Erik van Oosten) > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Philip, Colin, Chris, Matthias, Kirk, David, Xiangyuan LI, KIP-944 was extended a bit more to explain why effect systems like Zio and Cats-effects make it impossible to run code on a specific thread. I understand that using an effect system is pretty far removed from writing Java in transaction script style, the style that is probably used by most Kafka committers. I took me quite some time to get comfortable with effects. It is not the academic fringe tool as perceived by many. For me it is a way to quickly and correctly write serious data processing applications. Even so, we both use the same Kafka eco-system and supporting different styles only makes it more rich. IMHO it would be a shame if we can not live together using the same code base. Philip, thanks for your support. I hope I have convinced the others as well by now. If not, I am giving up and I will spend my energy elsewhere. Kind regards, Erik. Op 24-07-2023 om 18:12 schreef Erik van Oosten: Hello Xiangyuan LI, I am not familiar with coroutines, nor with Kotlin. You will have to work with the documentation: https://kotlinlang.org/docs/coroutines-overview.html However, I am familiar with Zio and Cats-effects (both Scala libraries). In both Zio and Cats-effects one creates effects (aka workflows) which are descriptions of a computation. For example, when executing the Scala code `val effect = ZIO.attempt(println("Hello world!"))` one creates only a description; it does not print anything yet. The language to describe these effects is rich enough to describe entire applications including things like concurrency. In fact, the language is so rich, that it is the most convenient way that I know to safely write highly concurrent and async applications. For many developer teams the performance penalty (which is real but not big) is worth it. To execute a Zio or Cats effect one gives it to the runtime. The runtime then schedules the work on one of the threads in its thread-pool. Zio, nor Cats-effects supports running an effect on the thread that manages the thread-pool. I hope this clear enough. Kind regards, Erik. Op 24-07-2023 om 05:21 schreef Xiangyuan LI: Hi Erik: I read KIP-944 and email list roughly, it seems most Java developer not familiar with the conception of "coroutine" so cannot imagine why code of one function without Thread.start() may run in separate threads and even developer couldn't control it. Maybe you need a more elaborate description to demonstrate how coroutine code run. Erik van Oosten 于2023年7月23日周日 17:47写道: -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Xiangyuan LI, I am not familiar with coroutines, nor with Kotlin. You will have to work with the documentation: https://kotlinlang.org/docs/coroutines-overview.html However, I am familiar with Zio and Cats-effects (both Scala libraries). In both Zio and Cats-effects one creates effects (aka workflows) which are descriptions of a computation. For example, when executing the Scala code `val effect = ZIO.attempt(println("Hello world!"))` one creates only a description; it does not print anything yet. The language to describe these effects is rich enough to describe entire applications including things like concurrency. In fact, the language is so rich, that it is the most convenient way that I know to safely write highly concurrent and async applications. For many developer teams the performance penalty (which is real but not big) is worth it. To execute a Zio or Cats effect one gives it to the runtime. The runtime then schedules the work on one of the threads in its thread-pool. Zio, nor Cats-effects supports running an effect on the thread that manages the thread-pool. I hope this clear enough. Kind regards, Erik. Op 24-07-2023 om 05:21 schreef Xiangyuan LI: Hi Erik: I read KIP-944 and email list roughly, it seems most Java developer not familiar with the conception of "coroutine" so cannot imagine why code of one function without Thread.start() may run in separate threads and even developer couldn't control it. Maybe you need a more elaborate description to demonstrate how coroutine code run. Erik van Oosten 于2023年7月23日周日 17:47写道: Hi David, > Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? I have been thinking on how to explain this for 2 months now and it is not easy. It has something to do with that you cannot control what a thread is doing if you have to also run on that thread. But I just realized that /for me/ it really comes down to this: We want to use Zio in the callback. Zio does not support it. There are more reasons as can be read in KAFKA-7143. But I do not know anything about Kotlin so I cannot elaborate on that. Kind regards, Erik. Op 22-07-2023 om 21:39 schreef David Jacot: Hi Erik, Thanks for the KIP. I would like to better understand the motivation of this KIP. I am not familiar with async runtimes so please excuse me if I ask stupid questions. Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? This is not clear to me. In the example that you use with the ConsumerRebalanceListener, I would have thought that calling commitSync (without changing thread) would have achieved the same. The invoker has to wait anyway on the offset commit completion so using another thread does not bring any benefit here. I suppose that I am missing something here… Regarding Chris’ proposal, this feels like a hack to me. The issue with it is that we cannot guarantee it in the long term if it is not part of *the* Consumer interface. I second what Chris said. We are all trying to understand the motivation in order to find a good solution for Kafka. I apologize if this creates frustration. This is definitely not our goal. Best, David PS: I just saw that you opened a new KIP based on Chris’ idea. This is not necessary. You can just update the current KIP based on the discussion. Le sam. 22 juil. 2023 à 18:34, Erik van Oosten a écrit : Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I t
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi David, > Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? I have been thinking on how to explain this for 2 months now and it is not easy. It has something to do with that you cannot control what a thread is doing if you have to also run on that thread. But I just realized that /for me/ it really comes down to this: We want to use Zio in the callback. Zio does not support it. There are more reasons as can be read in KAFKA-7143. But I do not know anything about Kotlin so I cannot elaborate on that. Kind regards, Erik. Op 22-07-2023 om 21:39 schreef David Jacot: Hi Erik, Thanks for the KIP. I would like to better understand the motivation of this KIP. I am not familiar with async runtimes so please excuse me if I ask stupid questions. Could you elaborate a bit more on why the callbacks must be ran in another thread vs in the invoker thread? This is not clear to me. In the example that you use with the ConsumerRebalanceListener, I would have thought that calling commitSync (without changing thread) would have achieved the same. The invoker has to wait anyway on the offset commit completion so using another thread does not bring any benefit here. I suppose that I am missing something here… Regarding Chris’ proposal, this feels like a hack to me. The issue with it is that we cannot guarantee it in the long term if it is not part of *the* Consumer interface. I second what Chris said. We are all trying to understand the motivation in order to find a good solution for Kafka. I apologize if this creates frustration. This is definitely not our goal. Best, David PS: I just saw that you opened a new KIP based on Chris’ idea. This is not necessary. You can just update the current KIP based on the discussion. Le sam. 22 juil. 2023 à 18:34, Erik van Oosten a écrit : Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve
[DISCUSS] KIP-957 Support async runtimes in consumer
Hello developers of the Java consumer, This is a simpler alternative to KIP-944 as proposed by Chris Egerton. In this proposal we make method acquire and release of the KafkaConsumer class protected. This allows anyone to implement these methods as appropriate for their environment. The wiki page for KIP-957 contains more details https://cwiki.apache.org/confluence/x/lY6zDw This is a call for discussion. If possible I would like to include this change in Kafka 3.6. Any questions, comments, ideas and other additions are welcome! Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Colin, Matthias, Chris, I have expanded the use case description in KIP-944. I hope it is more clear what we're trying to achieve. https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. Op 22-07-2023 om 17:23 schreef Erik van Oosten: Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. > It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thr
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi all, I have created https://github.com/apache/kafka/pull/14071 to implement Chris' idea. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something th
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Chris, Thanks for elaborating Matthias' words. Apparently the use case description is too terse. Indeed, that is not FUD and that is something I can work with. > It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. That is absolutely brilliant! Since I am pretty sure I am using the consumer correctly, I could replace acquire and release with an empty method body and be done. /Is making acquire and release protected something that other people can live with?/ If yes, I will create a new PR with just that change. Kind regards, Erik. Op 22-07-2023 om 16:39 schreef Chris Egerton: Hi Erik, I don't think Matthias is bringing FUD to the discussion. Many of the people who maintain Kafka are familiar with Kafka client internals and the Java programming language, but not necessarily other JVM languages or asynchronous runtimes. I think it's reasonable to ask for a code snippet or two that demonstrates what you'd like to do with the consumer today that you can't because of restrictions around concurrent access, and this is not already addressed in the KIP. Linking to a docs page on Kotlin coroutines is helpful but still requires reviewers to gather a lot of context on their own that could more easily be provided in the KIP, and although the description of KAFKA-7143 is more detailed, I find it a little hard to follow as someone who isn't already familiar with the environment the user is working in. It's also worth mentioning that what's proposed in the KIP is only blocked by the private access modifier on the KafkaConsumer::acquire and KafkaConsumer::release methods. If we upgraded the visibility of these methods from private to protected, it would be possible for subclasses to implement the proposal in KIP-944, without any KIPs or other changes to the official Java clients library. Best, Chris On Sat, Jul 22, 2023 at 4:24 AM Erik van Oosten wrote: Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Matthias, I am getting a bit frustrated here. All the concerns and questions I have seen so far are addressed in KIP-944. Please let me know if they are not clear enough, but please do not come with FUD. Kind regards, Erik. Op 21-07-2023 om 21:13 schreef Matthias J. Sax: I am not a clients (or threading) expert, but I tend to agree to Colin's concerns. In particular, it would be nice to see an example how you intent to use the API (I am not familiar with Kotlin or it's co-routins), to better understand what this changes help to solve to begin with. Opening up the consumer sounds potentially dangerous and we should weight opportunity and risk before making a decision. So far, I see risks but do not understand the opportunity you are after. -Matthias On 7/14/23 11:43 AM, Kirk True wrote: Hi Erik, Thanks for the KIP! I empathize with your frustration over the radio silence. It gets like that sometimes, and I apologize for my lack of feedback. I’d personally like to see this lively exchange move over to the DISCUSS thread you’d created before. Thanks, Kirk On Jul 14, 2023, at 1:33 AM, Erik van Oosten wrote: Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked f
Re: [DISCUSS] KIP-944 Support async runtimes in consumer
Hi Colin, Philip, Kirk, As far as I am aware all concerns about KIP-944 have been addressed. Including those about dirty reads between threads and green threads because of missing memory barriers. If you agree, I would like to open the vote again. If not, please let me know. I'll open another KIP with a proposal on how to improve the consumer API so that we don't need any thread trickery anymore. I would rather not wait for that one because there will be a lot work before that can even be implemented. Once KIP-944 has been accepted, I'll work on adding the unit tests that are described in the KIP. Kind regards, Erik. Op 30-06-2023 om 07:56 schreef Erik van Oosten: [This is a resend with the correct KIP number.] Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Colin, The way I understood Philp's message is that KIP-944 also plays nice with KIP-945. But I might be mistaken. Regardless, KIP-945 does /not/ resolve the underlying problem (the need for nested consumer invocations) because it has the explicit goal of not changing the user facing API. > ... KIP-945 but haven't posted a DISCUSS thread yet There is a thread called 'KafkaConsumer refactor proposal', but indeed no official discussion yet. > I really don't want to be debugging complex interactions between Java thread-local variables and green threads. In that email thread, I proposed an API change in which callbacks are no longer needed. The proposal completely removes the need for such complex interactions. In addition, it gives clients the ability to process at full speed even while a coorperative rebalance is ongoing. Regards, Erik. Op 14-07-2023 om 00:36 schreef Colin McCabe: HI Philip & Erik, Hmm... if we agree that KIP-945 addresses this use case, I think it would be better to just focus on that KIP. Fundamentally it's a better and cleaner model than a complex scheme involving thread-local variables. I really don't want to be debugging complex interactions between Java thread-local variables and green threads. It also generally helps to have some use-cases in mind when writing these things. If we get feedback about what would be useful for async runtimes, that would probably help improve and focus KIP-945. By the way, I can see you have a draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I assume it's not ready for review yet ;) best, Colin On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote: Hey Erik - Another thing I want to add to my comment is. We are in-process of re-writing the KafkaConsumer, and I think your proposal would work in the new consumer because we are going to separate the user thread and the background thread. Here is the 1-pager, and we are in process of converting this in to KIP-945. Thanks, P On Tue, Jul 11, 2023 at 10:33 AM Philip Nee wrote: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a
Re: KafkaConsumer refactor proposal
Hi Philip, I have been scanning through https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design and KIP-848 and from this I understand that the kafka consumer API will not change. Perhaps the refactoring and/or KIP-848 is a good opportunity to improve the API somewhat. In this email I explain why and also give a rough idea what that could look like. In the current API, the rebalance listener callback gives the user a chance to commit all work in progress before a partition is actually revoked and assigned to another consumer. While the callback is doing all this, the main user thread is not able to process new incoming data. So the rebalance listener affects latency and throughput for non-revoked partitions during a rebalance. In addition, I feel that doing a lot of stuff /in/ a callback is always quite awkward. Better only use it to trigger some processing elsewhere. Therefore, I would like to propose a new API that does not have these problems and is easy to use (and I hope still easy to implement). In my ideal world, poll is the only method that you need. Lets call it poll2 (to do: come up with a less crappy name). Poll2 returns more than just the polled records, it will also contain newly assigned partitions, partitions that will be revoked during the next call to poll2, partitions that were lost, and perhaps it will even contain the offsets committed so far. The most important idea here is that partitions are not revoked immediately, but in the next call to poll2. With this API, a user can commit offsets at their own pace during a rebalance. Optionally, for the case that processing of data from the to-be-revoked partition is stil ongoing, we allow the user to postpone the actual revocation in the next poll, so that polling can continue for other partitions. Since we are no longer blocking the main user thread, partitions that are not revoked can be processed at full speed. Removal of the rebalance listener also makes the API safer; there is no more need for the thread-id check (nor KIP-944) because, concurrent invocations are simply no longer needed. (Of course, if backward compatibility is a goal, not all of these things can be done.) Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hi Colin, Philip, I have added a section to KIIP-944 to address your concerns around memory consistency over multiple threads. You can read them here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-944%3A+Support+async+runtimes+in+consumer#KIP944:Supportasyncruntimesinconsumer-Threadsafety Kind regards, Erik. Op 12-07-2023 om 13:24 schreef Erik van Oosten: Thanks Philip, > I think this can be demonstrated via diagrams and some code in the KIP. There are some diagrams in KIP-944. How can they be improved? I will add some code to address the concerns around memory barriers. > We are in-process of re-writing the KafkaConsumer Nice! I will read the KIP. Hopefully we don't need complex logic in callbacks after the rewrite. Kind regards, Erik. Op 11-07-2023 om 19:33 schreef Philip Nee: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added thi
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Thanks Philip, > I think this can be demonstrated via diagrams and some code in the KIP. There are some diagrams in KIP-944. How can they be improved? I will add some code to address the concerns around memory barriers. > We are in-process of re-writing the KafkaConsumer Nice! I will read the KIP. Hopefully we don't need complex logic in callbacks after the rewrite. Kind regards, Erik. Op 11-07-2023 om 19:33 schreef Philip Nee: Hey Erik, Sorry for holding up this email for a few days since Colin's response includes some of my concerns. I'm in favor of this KIP, and I think your approach seems safe. Of course, I probably missed something therefore I think this KIP needs to cover different use cases to demonstrate it doesn't cause any unsafe access. I think this can be demonstrated via diagrams and some code in the KIP. Thanks, P On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten wrote: Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added this goal to the KIP. Hi Erik, Sorry, I spoke imprecisely. My concern is not concurrent access, but multithreaded access in general. Basically cache line visibility issues. > This is true even if the accesses happen at different times, because modern CPUs require memory barriers to guarantee inter-thread visibilty of loads and stores. In KIP-944, the callback thread can only delegate to anothe
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Hello Colin, >> In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. > I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. Now I was imprecise. The thread-locals are only somewhat involved. In the KIP proposal the callback thread reads an access key from a thread-local variable. It then needs to pass that access key to another thread, which then can set it on its own thread-local variable. The act of passing a value from one thread to another implies that a memory barrier needs to be passed. However, this is all not so relevant since there is no need to pass the access key back when the other thread is done. But now I think about it a bit more, the locking mechanism runs in a synchronized block. If I remember correctly this should be enough to pass read and write barriers. >> In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. > I'm not sure what you're referring to. Can you expand on this? Any invocation of the consumer (e.g. method poll) is not from a thread managed by the consumer. This is what I was assuming you meant with the term 'random thread'. > Hmm, not sure what you mean by "cooperate with blocking code." If you have 10 green threads you're multiplexing on to one CPU thread, and that CPU thread gets blocked because of what one green thread is doing, the other 9 green threads are blocked too, right? I guess it's "just" a performance problem, but it still seems like it could be a serious one. There are several ways to deal with this. All async runtimes I know (Akka, Zio, Cats-effects) support this by letting you mark a task as blocking. The runtime will then either schedule it to another thread-pool, or it will grow the thread-pool to accommodate. In any case 'the other 9 green threads' will simply be scheduled to another real thread. In addition, some of these runtimes detect long running tasks and will reschedule waiting tasks to another thread. This is all a bit off topic though. > I don't see why this has to be "inherently multi-threaded." Why can't we have the other threads report back what messages they've processed to the worker thread. Then it will be able to handle these callbacks without involving the other threads. Please consider the context which is that we are running inside the callback of the rebalance listener. The only way to execute something and also have a timeout on it is to run the something on another thread. Kind regards, Erik. Op 08-07-2023 om 19:17 schreef Colin McCabe: On Sat, Jul 8, 2023, at 02:41, Erik van Oosten wrote: Hi Colin, Thanks for your thoughts and taking the time to reply. Let me take away your concerns. None of your worries are an issue with the algorithm described in KIP-944. Here it goes: > It's not clear ot me that it's safe to access the Kafka consumer or producer concurrently from different threads. Concurrent access is /not/ a design goal of KIP-944. In fact, it goes through great lengths to make sure that this cannot happen. *The only design goal is to allow callbacks to call the consumer from another thread.* To make sure there are no more misunderstandings about this, I have added this goal to the KIP. Hi Erik, Sorry, I spoke imprecisely. My concern is not concurrent access, but multithreaded access in general. Basically cache line visibility issues. > This is true even if the accesses happen at different times, because modern CPUs require memory barriers to guarantee inter-thread visibilty of loads and stores. In KIP-944, the callback thread can only delegate to another thread after reading from and writing to a threadlocal variable, providing the barriers right there. I don't see any documentation that accessing thread local variables provides a total store or load barrier. Do you have such documentation? It seems like if this were the case, we could eliminate volatile variables from most of the code base. > I know that there are at least a few locks in the consumer code now, due to our need to send heartbeats from a worker thread. I don't think those would be sufficient to protect a client that is making calls from random threads. In the current implementation the consumer is also invoked from random threads. If it works now, it should continue to work. I'm not sure what you're referring to. Can you expand on this? > There has been some discussion of moving to a more traditional model where people make calls to the client
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
messages to and from a background worker thread" model than to try to re-engineer the Kafka client ot work from random threads. There is actually somed good advice about how to handle multiple threads in the KafkaConsumer.java header file itself. Check the sections "One Consumer Per Thread" and "Decouple Consumption and Processing." What I'm recommending here is essentially the latter. I do understand that it's frustrating to not get a quick response. However, overall I think this one needs a lot more discussion before getting anywhere near a vote. I will leave a -1 just as a procedural step. Maybe some of the people working in the client area can also chime in. best, Colin On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote: Dear PMCs, So far there have been 0 responses to KIP-944. I understand this may not be something that keeps you busy, but this KIP is important to people that use async runtimes like Zio, Cats and Kotlin. Is there anything you need to come to a decision? Kind regards, Erik. Op 05-07-2023 om 11:38 schreef Erik van Oosten: Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description:https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!
Dear PMCs, So far there have been 0 responses to KIP-944. I understand this may not be something that keeps you busy, but this KIP is important to people that use async runtimes like Zio, Cats and Kotlin. Is there anything you need to come to a decision? Kind regards, Erik. Op 05-07-2023 om 11:38 schreef Erik van Oosten: Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description: https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[VOTE] KIP-944 Support async runtimes in consumer
Hello all, I'd like to call a vote on KIP-944 Support async runtimes in consumer. It has has been 'under discussion' for 7 days now. 'Under discussion' between quotes, because there were 0 comments so far. I hope the KIP is clear! KIP description: https://cwiki.apache.org/confluence/x/chw0Dw Kind regards, Erik.
[DISCUSS] KIP-944 Support async runtimes in consumer
[This is a resend with the correct KIP number.] Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated KAFKA-14972: Description: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and [https://github.com/apache/kafka/pull/13914] for an implementation. was: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See KIP-944 for a proposal and > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten > Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See > [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and > [https://github.com/apache/kafka/pull/13914] for an implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated KAFKA-14972: Description: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. It should be possible for a thread to pass on its capability to access the consumer to another thread. See KIP-944 for a proposal and was: KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten > Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > It should be possible for a thread to pass on its capability to access the > consumer to another thread. See KIP-944 for a proposal and -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17738014#comment-17738014 ] Erik van Oosten commented on KAFKA-14972: - KIP-944 https://cwiki.apache.org/confluence/x/chw0Dw > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten > Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-944 Support async runtimes in consumer
It seems KIP-941 was already taken. Updated to: KIP-944. Op 28-06-2023 om 10:11 schreef Erik van Oosten: Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-944 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[DISCUSS] KIP-941 Support async runtimes in consumer
Hello developers of the Java based consumer, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. However, since it extends the public API I was requested to create a KIP. So here it is: KIP-941 Support async runtimes in consumer https://cwiki.apache.org/confluence/x/chw0Dw Any questions, comments, ideas and other additions are welcome! The KIP should be complete except for the testing section. As far as I am aware there are no tests for the current behavior. Any help in this area would be appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
[ https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17737711#comment-17737711 ] Erik van Oosten commented on KAFKA-14972: - I will complete the KIP tomorrow. > Make KafkaConsumer usable in async runtimes > --- > > Key: KAFKA-14972 > URL: https://issues.apache.org/jira/browse/KAFKA-14972 > Project: Kafka > Issue Type: Wish > Components: consumer > Reporter: Erik van Oosten > Assignee: Erik van Oosten >Priority: Major > Labels: needs-kip > > KafkaConsumer contains a check that rejects nested invocations from different > threads (method {{{}acquire{}}}). For users that use an async runtime, this > is an almost impossible requirement. Examples of async runtimes that are > affected are Kotlin co-routines (see KAFKA-7143) and Zio. > We propose to replace the thread-id check with an access-id that is stored on > a thread-local variable. Existing programs will not be affected. Developers > that work in an async runtime can pick up the access-id and set it on the > thread-local variable in a thread of their choosing. > Every time a callback is invoked a new access-id is generated. When the > callback completes, the previous access-id is restored. > This proposal does not make it impossible to use the client incorrectly. > However, we think it strikes a good balance between making correct usage from > an async runtime possible while making incorrect usage difficult. > Alternatives considered: > # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Permissions to contribute to Apache Kafka
Dear reader, I would like to create a KIP and understand I need to request permissions for that. my wiki username: e.vanoos...@chello.nl (note, this is /not/ my email address) my Jira username: erikvanoosten Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Review request for Java Kafka consumer
Dear Kafka developers, I submitted https://github.com/apache/kafka/pull/13914 to fix a long standing problem that the Kafka consumer on the JVM is not usable from asynchronous runtimes such as Kotlin co-routines and ZIO. Your review is much appreciated. Kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730665#comment-17730665 ] Erik van Oosten commented on KAFKA-10337: - Thanks for your PR [~thomaslee]. It has been merged now with little changes. > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Erik van Oosten >Priority: Major > Fix For: 3.6.0 > > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
Erik van Oosten created KAFKA-14972: --- Summary: Make KafkaConsumer usable in async runtimes Key: KAFKA-14972 URL: https://issues.apache.org/jira/browse/KAFKA-14972 Project: Kafka Issue Type: Wish Components: consumer Reporter: Erik van Oosten KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
Erik van Oosten created KAFKA-14972: --- Summary: Make KafkaConsumer usable in async runtimes Key: KAFKA-14972 URL: https://issues.apache.org/jira/browse/KAFKA-14972 Project: Kafka Issue Type: Wish Components: consumer Reporter: Erik van Oosten KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Kafka client needs KAFKA-10337 to cover async commit use case
Thanks! Here is Tom Lee's PR recreated on trunk: https://github.com/apache/kafka/pull/13678 I believe that this PR might not be complete though. When we only call commitAsync (repeatedly) from the rebalance listener callback method. Will the client's poll method ever be invoked? I suspect that no polling takes place in this scenario and that async commits will still not complete. With the changes of this PR, commitSync can be used as a workaround. I guess we can fix this by adding `client.pollNoWakeup()`, e.g. at the start of `ConsumerCoordinator.commitOffsetsAsync`. Is that an acceptable change? Kind regards, Erik. Op 05-05-2023 om 20:20 schreef Philip Nee: Hey Eric, Maybe its more straightforward to open a new PR. Thanks! P On Fri, May 5, 2023 at 9:36 AM Erik van Oosten wrote: If I were to rebase the old pull request and re-open KAFKA-10337, would it be considered for merging? Kind regards, Erik. Op 03-05-2023 om 09:21 schreef Erik van Oosten: Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720114#comment-17720114 ] Erik van Oosten commented on KAFKA-10337: - [~thomaslee] when we use commitAsync from the rebalance listener (potentially with empty offsets), no polling takes place anymore. Shall I amend the PR so that it does polling from commitAsync as well? WDYT? > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Kirk True >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified
[ https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720113#comment-17720113 ] Erik van Oosten commented on KAFKA-10337: - Opened [~thomaslee] 's PR again: https://github.com/apache/kafka/pull/13678 > Wait for pending async commits in commitSync() even if no offsets are > specified > --- > > Key: KAFKA-10337 > URL: https://issues.apache.org/jira/browse/KAFKA-10337 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Tom Lee >Assignee: Kirk True >Priority: Major > > The JavaDoc for commitSync() states the following: > {quote}Note that asynchronous offset commits sent previously with the > {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {quote} > But should we happen to call the method with an empty offset map > (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete > async commits will not be invoked because of an early return in > ConsumerCoordinator.commitOffsetsSync() when the input map is empty. > If users are doing manual offset commits and relying on commitSync as a > barrier for in-flight async commits prior to a rebalance, this could be an > important (though somewhat implementation-dependent) detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Kafka client needs KAFKA-10337 to cover async commit use case
If I were to rebase the old pull request and re-open KAFKA-10337, would it be considered for merging? Kind regards, Erik. Op 03-05-2023 om 09:21 schreef Erik van Oosten: Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Commi
Re: Kafka client needs KAFKA-10337 to cover async commit use case
Hi Philip, Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Of course! Short answer: we prefer commitAsync because we want to handle multiple partitions concurrently using the ZIO runtime. Long answer: this is in the context of zio-kafka. In zio-kafka the user writes code for a stream that processes data and does commits. In the library we intercept those commits and pass them to the KafkaConsumer. We also keep track of the offsets of handed out records. Together this information allows us to track when a stream is ready processing a partition and that it is safe to start the rebalance. All of this happens concurrently and asynchronously using the ZIO runtime. When calling commit inside the onPartitionRevoked callback the library does a thread-id check; we can only call the KafkaConsumer from the same thread that invoked us. This is unfortunate because it forces us to spin up a specialized single-threaded ZIO runtime inside the callback method. Though this runtime can run blocking methods like commitSync, it will need careful programming since all other tasks need to wait. (BTW, it would be great if there is an option to disable the thread-id check. It has more use cases, see for example KAFKA-7143.) is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Yes, that is correct. Sorry - I dug a bit into the old PR. Seems like the issue is there's broken contract that if the commitSync won't wait for the previous async commits to complete if it tries to commit an empty offset map. Indeed! I am assuming the same is true for commitAsync. The important thing is that we need something to get those callbacks. I would prefer commitAsync but if only commitSync gets fixed we can use that as well. If there is another method completely for this task, that would be good as well. Kind regards, Erik. Philip Nee schreef op 2023-05-02 21:49: Hey Erik, Just a couple of questions to you: Firstly, could you explain the situation in that you would prefer to invoke commitAsync over commitSync in the rebalance listener? Typically we would use the synchronized method to ensure the commits are completed before moving on with the rebalancing, which leads to my second comment/question. is it your concern that we currently don't have a way to invoke the callback, and the user won't be to correctly handle these failed/successful async commits? Thanks, P On Tue, May 2, 2023 at 12:22 PM Erik van Oosten wrote: Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Committer of zio-kafkahttps://github.com/zio/zio-kafka
Kafka client needs KAFKA-10337 to cover async commit use case
Dear developers of the Kafka java client, It seems I have found a feature gap in the Kafka java client. KAFKA-10337 and its associated pull request on Github (from 2020!) would solve this, but it was closed without merging. We would love to see it being reconsidered for merging. This mail has the arguments for doing so. The javadoc of `ConsumerRebalanceListener` method `onPartitionsRevoked` recommends you commit all offsets within the method, thereby holding up the rebalance until those commits are done. The (perceived) feature gap is when the user is trying to do async commits from the rebalance listener; there is nothing available to trigger the callbacks of completed commits. Without these callback, there is no way to know when it is safe to return from onPartitionsRevoked. (We cannot call `poll` because the rebalance listener is already called from inside a poll.) Calling `commitAsync` with an empty offsets parameter seems a perfect candidate for triggering callbacks of earlier commits. Unfortunately, commitAsync doesn't behave that way. This is fixed by mentioned pull request. The pull request conversation has a comment saying that calling `commit` with an empty offsets parameter is not something that should happen. I found this a strange thing to say. First of all, the method does have special handling for this situation, negating the comment outright. In addition this special handling violates the contract of the method (as specified in the javadoc section about ordering). Therefore, this pull request has 2 advantages: 1. KafkaConsumer.commitAsync will be more in line with its javadoc, 2. the feature gap is gone. Of course, it might be that I missed something and that there are other ways to trigger the commit callbacks. I would be very happy to hear about that because it means I do not have to wait for a release cycle. If you agree these arguments are sound, I would be happy to make the pull request mergable again. Curious to your thoughts and kind regards, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com Committer of zio-kafkahttps://github.com/zio/zio-kafka
How to get async commit callbacks in the rebalance listener?
Hi, I am trying to do async commits from the rebalance listener, to be precise, in method onPartitionsRevoked. The idea is to wait until all commits for the current epoch are done before the rebalance barrier and by doing so prevent duplicate processing. It is not so hard to call commitAsync(offsets, callback), but what method should be used so that the Kafka client gets a chance to call the callback? I tried the following: *1. Call **commitAsync(Collections.emptyMap, callback)** * Unfortunately when you call commitAsync with an empty offsets map, it doesn't call the callbacks of previous commits. There is a PR from 2020 that would fix this issue: https://github.com/apache/kafka/pull/9111. This PR was closed without merging. Should this PR be reconsidered? *2. Pause all partitions and call **poll(0)* Doesn't work; you'll get a "KafkaConsumer is not safe for multi-threaded access" exception. *3. Call commitSync(**Collections.emptyMap, callback)** * Behaves the same as under point 1. *4. Repeated calls to **commitAsync(offsets, callback)** * This time we keep calling commitAsync with the same offsets until these offsets are committed. Unfortunately, this never ends. Either because commitAsync doesn't call the callbacks, or because this just stacks up more commits to complete. I looked at the other methods on the consumer API but I didn't find anything that looked suitable for this use case. So to repeat the question: What method should I invoke (from the onPartitionsRevoked callback), to make the Kafka client invoke the callback of earlier async commits? Some context: I am working on zio-kafka; a completely async library that provides a concurrent streaming layer on top of the Java client. Thanks, Erik. -- Erik van Oosten e.vanoos...@grons.nl https://day-to-day-stuff.blogspot.com
Re: [scala-functional] Re: java.lang.NullPointerException with JDK 11
If you want to run on JDK 11 you will need to make sure you have a reasonably recent scala version installed. These work: 2.11.12, 2.12.11 and of course 2.13.2. It wouldn't hurt to use a recent sbt version as well. Kind regards, Erik. -- Erik van Oosten https://day-to-day-stuff.blogspot.com/ Op 02-06-20 om 23:39 schreef Jeremiah Malina: I experienced this as well on Xubuntu 20.04 where I had JDK 11 installed. I found this and was able to install JDK 8 and switch to it: https://linuxconfig.org/how-to-install-java-on-ubuntu-20-04-lts-focal-fossa-linux sbt runs and code compiles now On Monday, November 4, 2019 at 3:03:49 PM UTC-5, Gabe Lafontant wrote: I think I _know_ what the source of my issue is but want to ask if anyone else has seen this problem. Prior to purchasing the book, I had installed scala on my local machine. My JDK version is 11 and the scala version I have installed is 2.12.7. I followed the steps in the wiki: ``` $ chmod a+x ./sbt $ ./sbt ``` When I execute the last command I receive the NullPointerException. I think this is due to my machine have JDK 11 installed and I should uninstall it and revert to JDK 8. I think the version of Scala in the book is not supported by the JDK version I have. My analysis is based on these findings: https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html <https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html> https://github.com/ProjectSidewalk/SidewalkWebpage/issues/1346 <https://github.com/ProjectSidewalk/SidewalkWebpage/issues/1346> https://stackoverflow.com/questions/32152431/sbt-gives-java-lang-nullpointerexception-when-trying-to-run-simple-hello-exam <https://stackoverflow.com/questions/32152431/sbt-gives-java-lang-nullpointerexception-when-trying-to-run-simple-hello-exam> Once I got back to an older version of the JDK I think I should be fine. Does that make sense? -- You received this message because you are subscribed to the Google Groups "scala-functional" group. To unsubscribe from this group and stop receiving emails from it, send an email to scala-functional+unsubscr...@googlegroups.com <mailto:scala-functional+unsubscr...@googlegroups.com>. To view this discussion on the web, visit https://groups.google.com/d/msgid/scala-functional/69bcd325-40ca-4e3f-afce-ea096a9be810%40googlegroups.com <https://groups.google.com/d/msgid/scala-functional/69bcd325-40ca-4e3f-afce-ea096a9be810%40googlegroups.com?utm_medium=email&utm_source=footer>. -- You received this message because you are subscribed to the Google Groups "scala-functional" group. To unsubscribe from this group and stop receiving emails from it, send an email to scala-functional+unsubscr...@googlegroups.com. To view this discussion on the web, visit https://groups.google.com/d/msgid/scala-functional/54884c6e-00aa-8885-347f-765951192f8c%40grons.nl.
[jira] [Resolved] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved SPARK-27025. - Resolution: Incomplete > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 > Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16784854#comment-16784854 ] Erik van Oosten commented on SPARK-27025: - If there is no obvious way to improve Spark, then its probably better to close this issue until someone finds a better angle. BTW, the cache/count/iterate/unpersist cycle did not make it faster for my use case. I will try the 2-partition implementation of toLocalIterator. [~srowen], [~hyukjin.kwon], thanks for your input! > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 > Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783220#comment-16783220 ] Erik van Oosten edited comment on SPARK-27025 at 3/4/19 10:36 AM: -- [~hyukjin.kwon] maybe I misunderstood Sean's comment. I understood that every invocation of toLocalIterator will either benefit, or not have any negative side effect. Under this assumption, it would be better to put the cache/count/iterate/unpersist logic directly in toLocalIterator. I can not make any assumptions on the number of use cases. was (Author: erikvanoosten): [~hyukjin.kwon] maybe I misunderstood Sean's comment. I understood that every invocation of toLocalIterator will either benefit, or not have any negative side effect. Under this assumption, it would be better to put the cache/count/iterate/unpersist logic directly in toLocalIterator. > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783220#comment-16783220 ] Erik van Oosten commented on SPARK-27025: - [~hyukjin.kwon] maybe I misunderstood Sean's comment. I understood that every invocation of toLocalIterator will either benefit, or not have any negative side effect. Under this assumption, it would be better to put the cache/count/iterate/unpersist logic directly in toLocalIterator. > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16783110#comment-16783110 ] Erik van Oosten commented on SPARK-27025: - Thanks Sean, that is very useful. In my use case the entire data set is too big for the driver, but I can easily fit 1/10th of it. So even with as little as 20 partitions, 2 partitions on the driver would be fine. In the use case there are 2 joins, and a groupby/count so this is probably a wide transformation. So it seems that the cache/count/toLocalIterator/unpersist approach is applicable. The ergonomics of this approach are way worse, so I don't agree that it is 'better' to do this in application code. > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782322#comment-16782322 ] Erik van Oosten edited comment on SPARK-27025 at 3/2/19 8:43 AM: - The point is to _not_ fetch pro-actively. I have a program in which several steps need to be executed before anything can be transferred to the driver. So why can't the executors start executing immediately, and only transfer the results to the driver when its ready? was (Author: erikvanoosten): I have a program in which several steps need to be executed before anything can be transferred to the driver. So why can't the executors start executing immediately, and only transfer the results to the driver when its ready? > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27025) Speed up toLocalIterator
[ https://issues.apache.org/jira/browse/SPARK-27025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782322#comment-16782322 ] Erik van Oosten commented on SPARK-27025: - I have a program in which several steps need to be executed before anything can be transferred to the driver. So why can't the executors start executing immediately, and only transfer the results to the driver when its ready? > Speed up toLocalIterator > > > Key: SPARK-27025 > URL: https://issues.apache.org/jira/browse/SPARK-27025 > Project: Spark > Issue Type: Wish > Components: Spark Core >Affects Versions: 2.3.3 >Reporter: Erik van Oosten >Priority: Major > > Method {{toLocalIterator}} fetches the partitions to the driver one by one. > However, as far as I can see, any required computation for the > yet-to-be-fetched-partitions is not kicked off until it is fetched. > Effectively only one partition is being computed at the same time. > Desired behavior: immediately start calculation of all partitions while > retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27025) Speed up toLocalIterator
Erik van Oosten created SPARK-27025: --- Summary: Speed up toLocalIterator Key: SPARK-27025 URL: https://issues.apache.org/jira/browse/SPARK-27025 Project: Spark Issue Type: Wish Components: Spark Core Affects Versions: 2.3.3 Reporter: Erik van Oosten Method {{toLocalIterator}} fetches the partitions to the driver one by one. However, as far as I can see, any required computation for the yet-to-be-fetched-partitions is not kicked off until it is fetched. Effectively only one partition is being computed at the same time. Desired behavior: immediately start calculation of all partitions while retaining the download-a-partition at a time behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
Opportunity to speed up toLocalIterator?
Hi, This might be an opportunity to give a huge speed bump to toLocalIterator. Method toLocalIterator fetches the partitions to the driver one by one. This is great. What is not so great, is that any required computation for the yet-to-be-fetched-partitions is not kicked off until it is fetched. Effectively only one partition is being computed at the same time, giving idle resources and longer wait time. Is this observation correct? Is it possible to have concurrent computation on all partitions while retaining the download-a-partition at a time behavior? Kind regards, Erik. -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart
Hi sohimankotia, My advise from also having to sub-class BucketingSink: * rebase your changes on the BucketingSink that comes with the Flink version you are using * use the same super completely ugly hack I had to deploy as described here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-BucketingSink-when-cancelling-Flink-job-td15856.html#a16168 * consider using the successor of BucketingSink: StreamingFileSink Good luck, Erik. Op 27-01-19 om 10:13 schreef sohimankotia: Hi Team, Any help/update on this ? This is still an issue where i am using bucketing sink in production. Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585713#comment-16585713 ] Erik van Oosten commented on KAFKA-960: --- Metrics 4.x was released not so long ago. The core is binary compatible with metrics 3.x. However, many modules were split from the core and these got a different package name (and are therefore not compatible). For just collecting, you're probably fine. Please also know that Metrics 5.x is on standby for more then half a year. Metrics 5 will support tags. Metrics 5 is not binary compatible. I recommend upgrading to Metrics 4. > Upgrade Metrics to 3.x > -- > > Key: KAFKA-960 > URL: https://issues.apache.org/jira/browse/KAFKA-960 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 0.8.1 >Reporter: Cosmin Lehene >Priority: Major > > Now that metrics 3.0 has been released > (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245474#comment-16245474 ] Erik van Oosten commented on FLINK-5633: bq. Just curious, why are you creating a new reader for each record? Its just a bit easier then caching a reader for each writer/reader schema combination. > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutor
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241701#comment-16241701 ] Erik van Oosten commented on FLINK-5633: [~StephanEwen] We need to process 130K msg/s, I guess that can be called often :) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement means we can run with fewer machines. For every message we create a new SpecificDatumReader. If I follow the code correctly that should _not_ give a large overhead. The Schema instances we pass to it _are_ cached. Then we call {SpecificDatumReader.read}} to parse each Avro message. In that call you eventually end up in {{SpecificData.newInstance}} to create a new instance of the target class. The constructor of that class is looked up in a cache. That cache is declared as {{static}}. I do not understand how instantiating a new {{SpecificData}} for every call to {{read}} helps because it would still use the same cache. The code I pasted above also uses a constructor cache but the cache is not {{static}}. Reversing the class loader order should also work. > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) > at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at >
Re: Exception in BucketingSink when cancelling Flink job
Hi Wangsan, We were struggling with this for many days as well. In the end we found a work around. Well work-around, this for sure qualifies as one of the ugliest hacks I have ever contemplated. Our work-around for Flink immediately interrupting the close, is to continue closing on another thread! Here is an example in Scala: class MyBucketingSink[A](basePath: String) extends BucketingSink[A](basePath) { override def close(): Unit = { // // Unfortunately, Flink closes very very very very eagerly. So eagerly in fact that it will try to kill us by // interrupting the current thread immediately. Let's try to continue on a different thread :evil-grin: // def superClose(): Unit = super.close() new Thread( new Runnable { override def run(): Unit = { logger.info("Close invoked on MyBucketingSink on task " + getRuntimeContext.getTaskNameWithSubtasks) try { superClose() } catch { case e: Throwable => logger.error(e)("Failed to close task " + getRuntimeContext.getTaskNameWithSubtasks) } } }, "Closing task " + getRuntimeContext.getTaskNameWithSubtasks ).start() } } Obviously, if the close hangs, the entire job will hang and Flink will need to be fully restarted. Please let us know if you see any other problems with this approach. Kind regards, Erik. > Op 27 sep. 2017, om 07:33 heeft wangsan het volgende > geschreven: > > After digging into the source code, we found that when Flink job is canceled, > a TaskCanceler thread is created. > > The TaskCanceler thread calls cancel() on the invokable and periodically > interrupts the > task thread until it has terminated. > > try { > invokable.cancel(); > } catch (Throwable t) { > logger.error("Error while canceling the task {}.", taskName, t); > }//..executer.interrupt();try { > executer.join(interruptInterval); > }catch (InterruptedException e) { // we can ignore this}//.. > Notice that TaskCanceler first send interrupt signal to task thread, and > following with join method. And since the task thread is now try to close > DFSOutputStream, which is waiting for ack, thus InterruptedException is > throwed out in task thread. > > synchronized (dataQueue) {while (!streamerClosed) { > checkClosed(); if (lastAckedSeqno >= seqno) {break; > } try { > dataQueue.wait(1000); // when we receive an ack, we notify on > // dataQueue > } catch (InterruptedException ie) {throw new InterruptedIOException( > "Interrupted while waiting for data to be acknowledged by pipeline"); > } > } > I was confused why TaskCanceler call executer.interrupt() before > executer.join(interruptInterval). Can anyone help? > > > > > > > Hi, > > We are currently using BucketingSink to save data into HDFS in parquet > format. But when the flink job was cancelled, we always got Exception in > BucketingSink's close method. The datailed exception info is as below: > [ERROR] [2017-09-26 20:51:58,893] > [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal > of stream operator. > java.io.InterruptedIOException: Interrupted while waiting for data to be > acknowledged by pipeline > at > org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151) > at > org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130) > at > org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266) > at > org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > at > org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) > at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301) > ... > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > > It seems that DFSOutputStream haven't been closed before task thread is force > terminated. We found a similar problem in > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html > >
[jira] [Commented] (AVRO-2076) Combine already serialized Avro records to an Avro file
[ https://issues.apache.org/jira/browse/AVRO-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167422#comment-16167422 ] Erik van Oosten commented on AVRO-2076: --- Awesome! Thanks Doug. Somehow I missed that method. > Combine already serialized Avro records to an Avro file > --- > > Key: AVRO-2076 > URL: https://issues.apache.org/jira/browse/AVRO-2076 > Project: Avro > Issue Type: Wish > Reporter: Erik van Oosten > > In some use cases Avro events arrive already serialized (e.g. when listening > to a Kafka topic). It would be great if there would an API that allows > writing an Avro file without the need for deserializing and serializing these > Avro records. > Providing such an API allows for very efficient creation of Avro files: given > that these Avro records are written with the same schema, an Avro file would > write will the exact same bytes anyway (before block compression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AVRO-2076) Combine already serialized Avro records to an Avro file
[ https://issues.apache.org/jira/browse/AVRO-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved AVRO-2076. --- Resolution: Not A Problem > Combine already serialized Avro records to an Avro file > --- > > Key: AVRO-2076 > URL: https://issues.apache.org/jira/browse/AVRO-2076 > Project: Avro > Issue Type: Wish > Reporter: Erik van Oosten > > In some use cases Avro events arrive already serialized (e.g. when listening > to a Kafka topic). It would be great if there would an API that allows > writing an Avro file without the need for deserializing and serializing these > Avro records. > Providing such an API allows for very efficient creation of Avro files: given > that these Avro records are written with the same schema, an Avro file would > write will the exact same bytes anyway (before block compression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166443#comment-16166443 ] Erik van Oosten commented on FLINK-4796: I am not sure why this is marked as a duplicate. The problem here is inconsistent handling of the runtime context inside the different layers under FlinkKafkaProducer: method {{getRuntimeContext}} gives {{null}} even though {{setRuntimeContext}} was called. How does that relate to the addition of a new interface? > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Aljoscha Krettek > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AVRO-2076) Combine already serialized Avro records to an Avro file
Erik van Oosten created AVRO-2076: - Summary: Combine already serialized Avro records to an Avro file Key: AVRO-2076 URL: https://issues.apache.org/jira/browse/AVRO-2076 Project: Avro Issue Type: Wish Reporter: Erik van Oosten In some use cases Avro events arrive already serialized (e.g. when listening to a Kafka topic). It would be great if there would an API that allows writing an Avro file without the need for deserializing and serializing these Avro records. Providing such an API allows for very efficient creation of Avro files: given that these Avro records are written with the same schema, an Avro file would write will the exact same bytes anyway (before block compression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4796) Add new Sink interface with access to more meta data
[ https://issues.apache.org/jira/browse/FLINK-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160810#comment-16160810 ] Erik van Oosten commented on FLINK-4796: A workaround is to override {{setRuntimeContext}} (make sure to call {{super.setRuntimeContext}}), and use the passed in context. Possibly store it in a private field for later access. > Add new Sink interface with access to more meta data > > > Key: FLINK-4796 > URL: https://issues.apache.org/jira/browse/FLINK-4796 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Robert Metzger > > The current {{SinkFunction}} cannot access the timestamps of elements which > resulted in the (somewhat hacky) {{FlinkKafkaProducer010}}. Due to other > limitations {{GenericWriteAheadSink}} is currently also a {{StreamOperator}} > and not a {{SinkFunction}}. > We should add a new interface for sinks that takes a context parameter, > similar to {{ProcessFunction}}. This will allow sinks to query additional > meta data about the element that they're receiving. > This is one ML thread where a user ran into a problem caused by this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-td12633.html#a12635 > h3. Original Text (that is still valid but not general) > The Kafka 0.10 connector supports writing event timestamps to Kafka. > Currently, the regular DataStream APIs don't allow user code to access the > event timestamp easily. That's why the Kafka connector is using a custom > operator ({{transform()}}) to access the event time. > With this JIRA, I would like to provide the event timestamp in the regular > DataStream APIs. > Once I'll look into the issue, I'll post some proposals how to add the > timestamp. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-1390) java.lang.ClassCastException: X cannot be cast to X
[ https://issues.apache.org/jira/browse/FLINK-1390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058708#comment-16058708 ] Erik van Oosten commented on FLINK-1390: See https://issues.apache.org/jira/browse/FLINK-5633?focusedCommentId=16058706&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16058706 for a proper solution. > java.lang.ClassCastException: X cannot be cast to X > > > Key: FLINK-1390 > URL: https://issues.apache.org/jira/browse/FLINK-1390 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 0.8.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > A user is affected by an issue, which is probably caused by different > classloaders being used for loading user classes. > Current state of investigation: > - the error happens in yarn sessions (there is only a YARN environment > available) > - the error doesn't happen on the first time the job is being executed. It > only happens on subsequent executions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.
[ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058706#comment-16058706 ] Erik van Oosten commented on FLINK-5633: In case you need throughput (like we do), the caching is indispensable. In those cases you can use the following {{SpecificData}} implementation. Simply instantiate it once and then pass that singleton instance to every {{SpecificDatumReader}}. {code:scala|title=LocalCachingSpecificData.scala} import java.lang.reflect.Constructor import java.util.concurrent.ConcurrentHashMap import org.apache.avro.Schema import org.apache.avro.specific.SpecificData import scala.collection.JavaConverters._ /** * This can be used instead of [[SpecificData]] in multi-classloader environments like Flink. * This variation removes the JVM singleton constructor cache and replaces it with a * cache that is local to the current class loader. * * If two Flink jobs use the same generated Avro code, they will still have separate instances of the classes because * they live in separate class loaders. * However, a JVM-wide singleton cache keeps reference to the class in the first class loader that was loaded. Any * subsequent jobs will fail with a [[ClassCastException]] because they will get incompatible classes. */ class LocalCachingSpecificData extends SpecificData { private val NO_ARG: Array[Class[_]] = Array.empty private val SCHEMA_ARG: Array[Class[_]] = Array(classOf[Schema]) private val CTOR_CACHE: scala.collection.concurrent.Map[Class[_], Constructor[_]] = new ConcurrentHashMap[Class[_], Constructor[_]]().asScala /** Create an instance of a class. * If the class implements [[org.apache.avro.specific.SpecificData.SchemaConstructable]], call a constructor with a * [[org.apache.avro.Schema]] parameter, otherwise use a no-arg constructor. */ private def newInstance(c: Class[_], s: Schema): AnyRef = { val useSchema = classOf[SpecificData.SchemaConstructable].isAssignableFrom(c) val constructor = CTOR_CACHE.getOrElseUpdate(c, { val ctor = c.getDeclaredConstructor((if (useSchema) SCHEMA_ARG else NO_ARG): _*) ctor.setAccessible(true) ctor }) if (useSchema) constructor.newInstance(s).asInstanceOf[AnyRef] else constructor.newInstance().asInstanceOf[AnyRef] } override def createFixed(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.createFixed(old, schema) // delegate to generic else if (c.isInstance(old)) old else newInstance(c, schema) } override def newRecord(old: AnyRef, schema: Schema): AnyRef = { val c = getClass(schema) if (c == null) super.newRecord(old, schema) // delegate to generic else if (c.isInstance(old)) {old } else {newInstance(c, schema) } } } {code} > ClassCastException: X cannot be cast to X when re-submitting a job. > --- > > Key: FLINK-5633 > URL: https://issues.apache.org/jira/browse/FLINK-5633 > Project: Flink > Issue Type: Bug > Components: Job-Submission, YARN >Affects Versions: 1.1.4 >Reporter: Giuliano Caliari >Priority: Minor > > I’m running a job on my local cluster and the first time I submit the job > everything works but whenever I cancel and re-submit the same job it fails > with: > {quote} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) > at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) > at > au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at
[jira] [Comment Edited] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050925#comment-16050925 ] Erik van Oosten edited comment on FLINK-6928 at 6/15/17 6:39 PM: - In my ideal world method {{getTargetTopic}} would be removed from {{*SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{*SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. was (Author: erikvanoosten): In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050925#comment-16050925 ] Erik van Oosten commented on FLINK-6928: In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. It would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. was: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > It would be nice if we could signal that the defaultTopic is not needed by > passing {{null}}. Currently, a value for the defaultTopic is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. In addition, it would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. was: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. It would be nice if we could signal that the defaultTopic is not needed by passing {{null}}. Currently, a value for the defaultTopic is required. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Summary: Kafka sink: default topic should not need to exist (was: Kafka source: default topic should not need to exist) > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka source, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Description: When using a Kafka sink, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. was: When using a Kafka source, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6928) Kafka source: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated FLINK-6928: --- Summary: Kafka source: default topic should not need to exist (was: Kafka source: default topic needs to exist) > Kafka source: default topic should not need to exist > > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 > Reporter: Erik van Oosten > > When using a Kafka source, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6928) Kafka source: default topic needs to exist
Erik van Oosten created FLINK-6928: -- Summary: Kafka source: default topic needs to exist Key: FLINK-6928 URL: https://issues.apache.org/jira/browse/FLINK-6928 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.1, 1.3.0 Reporter: Erik van Oosten When using a Kafka source, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6928) Kafka source: default topic needs to exist
Erik van Oosten created FLINK-6928: -- Summary: Kafka source: default topic needs to exist Key: FLINK-6928 URL: https://issues.apache.org/jira/browse/FLINK-6928 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.1, 1.3.0 Reporter: Erik van Oosten When using a Kafka source, the defaultTopic needs to exist even when it is never used. It would be nice if fetching partition information for the default topic would be delayed until the moment a topic is actually used. Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the default topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AVRO-2022) IDL does not allow `schema` as identifier
[ https://issues.apache.org/jira/browse/AVRO-2022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021085#comment-16021085 ] Erik van Oosten commented on AVRO-2022: --- After working with this change for some time we decided to abandon this idea and change the schema after all. The problem is that much code generation tools assume they can create method {{Schema getSchema()}}. Unfortunately this collides with the value we want it to return. > IDL does not allow `schema` as identifier > - > > Key: AVRO-2022 > URL: https://issues.apache.org/jira/browse/AVRO-2022 > Project: Avro > Issue Type: Bug > Components: java >Affects Versions: 1.7.7, 1.8.1 > Reporter: Erik van Oosten > > The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the > following does not compile: > {noformat} > record { >string `schema`; > } > {noformat} > Patches are available for the master branch: > https://github.com/apache/avro/pull/209 and 1.7 branch: > https://github.com/apache/avro/pull/211 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AVRO-2022) IDL does not allow `schema` as identifier
[ https://issues.apache.org/jira/browse/AVRO-2022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten resolved AVRO-2022. --- Resolution: Invalid > IDL does not allow `schema` as identifier > - > > Key: AVRO-2022 > URL: https://issues.apache.org/jira/browse/AVRO-2022 > Project: Avro > Issue Type: Bug > Components: java >Affects Versions: 1.7.7, 1.8.1 > Reporter: Erik van Oosten > > The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the > following does not compile: > {noformat} > record { >string `schema`; > } > {noformat} > Patches are available for the master branch: > https://github.com/apache/avro/pull/209 and 1.7 branch: > https://github.com/apache/avro/pull/211 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AVRO-2022) IDL does not allow `schema` as identifier
[ https://issues.apache.org/jira/browse/AVRO-2022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated AVRO-2022: -- Description: The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the following does not compile: {noformat} record { string `schema`; } {noformat} Patches are available for the master branch: https://github.com/apache/avro/pull/209 and 1.7 branch: https://github.com/apache/avro/pull/211 was: The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the following does not compile: {noformat} record { string `schema`; } {noformat} Patches are available for the master and 1.7 branches here: (todo) > IDL does not allow `schema` as identifier > - > > Key: AVRO-2022 > URL: https://issues.apache.org/jira/browse/AVRO-2022 > Project: Avro > Issue Type: Bug > Components: java >Affects Versions: 1.7.7, 1.8.1 > Reporter: Erik van Oosten > > The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the > following does not compile: > {noformat} > record { >string `schema`; > } > {noformat} > Patches are available for the master branch: > https://github.com/apache/avro/pull/209 and 1.7 branch: > https://github.com/apache/avro/pull/211 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AVRO-2022) IDL does not allow `schema` as identifier
Erik van Oosten created AVRO-2022: - Summary: IDL does not allow `schema` as identifier Key: AVRO-2022 URL: https://issues.apache.org/jira/browse/AVRO-2022 Project: Avro Issue Type: Bug Components: java Affects Versions: 1.8.1, 1.7.7 Reporter: Erik van Oosten The keyword {{schema}} is now allowed as escaped identifier in IDL. E.g. the following does not compile: {noformat} record { string `schema`; } {noformat} Patches are available for the master and 1.7 branches here: (todo) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
Pulling out my pull request
Hi Thrift devs, Due to lack of interest from this community I am pulling out my PR https://github.com/apache/thrift/pull/1036. Rather then just silently publish this documentation elsewhere, I am sending this email to hopefully initiate a discussion that will lead to a better situation. Happy coding and kind regards, Erik. -- Erik van Oosten https://day-to-day-stuff.blogspot.nl/
[jira] [Issue Comment Deleted] (THRIFT-3867) Specify BinaryProtocol and CompactProtocol
[ https://issues.apache.org/jira/browse/THRIFT-3867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated THRIFT-3867: Comment: was deleted (was: Pull request in https://github.com/apache/thrift/pull/1036.) > Specify BinaryProtocol and CompactProtocol > -- > > Key: THRIFT-3867 > URL: https://issues.apache.org/jira/browse/THRIFT-3867 > Project: Thrift > Issue Type: Documentation > Components: Documentation > Reporter: Erik van Oosten > > It would be nice when the protocol(s) would be specified somewhere. This > should improve communication between developers, but also opens the way for > alternative implementations so that Thrift can thrive even better. > I have a fairly complete description of the BinaryProtocol and > CompactProtocol which I will submit as a patch for further review and > discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (THRIFT-3867) Specify BinaryProtocol and CompactProtocol
[ https://issues.apache.org/jira/browse/THRIFT-3867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355043#comment-15355043 ] Erik van Oosten commented on THRIFT-3867: - Pull request in https://github.com/apache/thrift/pull/1036. > Specify BinaryProtocol and CompactProtocol > -- > > Key: THRIFT-3867 > URL: https://issues.apache.org/jira/browse/THRIFT-3867 > Project: Thrift > Issue Type: Documentation > Components: Documentation > Reporter: Erik van Oosten > > It would be nice when the protocol(s) would be specified somewhere. This > should improve communication between developers, but also opens the way for > alternative implementations so that Thrift can thrive even better. > I have a fairly complete description of the BinaryProtocol and > CompactProtocol which I will submit as a patch for further review and > discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (THRIFT-3867) Specify BinaryProtocol and CompactProtocol
Erik van Oosten created THRIFT-3867: --- Summary: Specify BinaryProtocol and CompactProtocol Key: THRIFT-3867 URL: https://issues.apache.org/jira/browse/THRIFT-3867 Project: Thrift Issue Type: Documentation Components: Documentation Reporter: Erik van Oosten It would be nice when the protocol(s) would be specified somewhere. This should improve communication between developers, but also opens the way for alternative implementations so that Thrift can thrive even better. I have a fairly complete description of the BinaryProtocol and CompactProtocol which I will submit as a patch for further review and discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (SPARK-6878) Sum on empty RDD fails with exception
[ https://issues.apache.org/jira/browse/SPARK-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Erik van Oosten updated SPARK-6878: --- Flags: Patch > Sum on empty RDD fails with exception > - > > Key: SPARK-6878 > URL: https://issues.apache.org/jira/browse/SPARK-6878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 > Reporter: Erik van Oosten >Priority: Minor > > {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. > A simple fix is the replace > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.reduce(_ + _) > {noformat} > with: > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6878) Sum on empty RDD fails with exception
[ https://issues.apache.org/jira/browse/SPARK-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492336#comment-14492336 ] Erik van Oosten commented on SPARK-6878: Pull request: https://github.com/apache/spark/pull/5489 > Sum on empty RDD fails with exception > - > > Key: SPARK-6878 > URL: https://issues.apache.org/jira/browse/SPARK-6878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 > Reporter: Erik van Oosten >Priority: Minor > > {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. > A simple fix is the replace > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.reduce(_ + _) > {noformat} > with: > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6878) Sum on empty RDD fails with exception
[ https://issues.apache.org/jira/browse/SPARK-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492302#comment-14492302 ] Erik van Oosten commented on SPARK-6878: Ah, yes. I now see that fold also first reduces per partition. > Sum on empty RDD fails with exception > - > > Key: SPARK-6878 > URL: https://issues.apache.org/jira/browse/SPARK-6878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 > Reporter: Erik van Oosten >Priority: Minor > > {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. > A simple fix is the replace > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.reduce(_ + _) > {noformat} > with: > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6878) Sum on empty RDD fails with exception
[ https://issues.apache.org/jira/browse/SPARK-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492282#comment-14492282 ] Erik van Oosten edited comment on SPARK-6878 at 4/13/15 11:16 AM: -- The answer is only defined because the RDD is an {{RDD[Double]}} :) Sure, I'll make a PR. Is the proposed solution acceptable? was (Author: erikvanoosten): The answer is only defined because the RDD is an {{RDD[Double]}} :) Sure, I'll make a PR. > Sum on empty RDD fails with exception > - > > Key: SPARK-6878 > URL: https://issues.apache.org/jira/browse/SPARK-6878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 >Reporter: Erik van Oosten >Priority: Minor > > {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. > A simple fix is the replace > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.reduce(_ + _) > {noformat} > with: > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6878) Sum on empty RDD fails with exception
[ https://issues.apache.org/jira/browse/SPARK-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492282#comment-14492282 ] Erik van Oosten commented on SPARK-6878: The answer is only defined because the RDD is an {{RDD[Double]}} :) Sure, I'll make a PR. > Sum on empty RDD fails with exception > - > > Key: SPARK-6878 > URL: https://issues.apache.org/jira/browse/SPARK-6878 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.2.0 >Reporter: Erik van Oosten >Priority: Minor > > {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. > A simple fix is the replace > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.reduce(_ + _) > {noformat} > with: > {noformat} > class DoubleRDDFunctions { > def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6878) Sum on empty RDD fails with exception
Erik van Oosten created SPARK-6878: -- Summary: Sum on empty RDD fails with exception Key: SPARK-6878 URL: https://issues.apache.org/jira/browse/SPARK-6878 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.2.0 Reporter: Erik van Oosten Priority: Minor {{Sum}} on an empty RDD throws an exception. Expected result is {{0}}. A simple fix is the replace {noformat} class DoubleRDDFunctions { def sum(): Double = self.reduce(_ + _) {noformat} with: {noformat} class DoubleRDDFunctions { def sum(): Double = self.aggregate(0.0)(_ + _, _ + _) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[Bug 1388955] Re: POODLE vulnerability because SSL3 can not be disabled
The problem has not gone away. I guess this package could use a new maintainer. -- You received this bug notification because you are a member of Ubuntu Bugs, which is subscribed to Ubuntu. https://bugs.launchpad.net/bugs/1388955 Title: POODLE vulnerability because SSL3 can not be disabled To manage notifications about this bug go to: https://bugs.launchpad.net/ubuntu/+source/pound/+bug/1388955/+subscriptions -- ubuntu-bugs mailing list ubuntu-bugs@lists.ubuntu.com https://lists.ubuntu.com/mailman/listinfo/ubuntu-bugs
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14194990#comment-14194990 ] Erik van Oosten commented on KAFKA-960: --- If 2.20 and 2.1.5 are indeed binary compatible (how do you test that?), _all existing_ releases could be patched by simply replacing a jar :) > Upgrade Metrics to 3.x > -- > > Key: KAFKA-960 > URL: https://issues.apache.org/jira/browse/KAFKA-960 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1 >Reporter: Cosmin Lehene > > Now that metrics 3.0 has been released > (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[Bug 1388955] [NEW] POODLE vulnerability because SSL3 can not be disabled
Public bug reported: Please upgrade pound package to the latest mainstream release (2.7d) so that SSL3 can be disabled. SSL3 needs to be disabled because of the POODLE vulnerability. More details around other options on http://www.apsis.ch/pound/pound_list/archive/2014/2014-10/1414097953000 Thanks, Erik. ** Affects: pound (Ubuntu) Importance: Undecided Status: New ** Information type changed from Private Security to Public -- You received this bug notification because you are a member of Ubuntu Bugs, which is subscribed to Ubuntu. https://bugs.launchpad.net/bugs/1388955 Title: POODLE vulnerability because SSL3 can not be disabled To manage notifications about this bug go to: https://bugs.launchpad.net/ubuntu/+source/pound/+bug/1388955/+subscriptions -- ubuntu-bugs mailing list ubuntu-bugs@lists.ubuntu.com https://lists.ubuntu.com/mailman/listinfo/ubuntu-bugs
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14194883#comment-14194883 ] Erik van Oosten commented on KAFKA-960: --- Fixing the issue where Mbean names are wrapped in quotes is easily solved by downgrading to Metrics 2.1.5. The quotes are a regression in 2.2.0. Version 2.1.5 is probably even binary compatible with 2.2.0. A major benefits of metrics 3 is that the max and min values are based on sampling and are no longer absolute since the start of the application. > Upgrade Metrics to 3.x > -- > > Key: KAFKA-960 > URL: https://issues.apache.org/jira/browse/KAFKA-960 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1 >Reporter: Cosmin Lehene > > Now that metrics 3.0 has been released > (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: MBeans, dashes, underscores, and KAFKA-1481
Hi Jun, The quotes are because of a regression in Metrics 2.2.0. IMHO Metrics 2.2.0 should not be used because of this. Just downgrade to Metrics 2.1.5 and you are good. Of course, upgrading to Metrics 3 would do the trick also. Kind regards, Erik. Jun Rao schreef op 17-10-14 om 20:54: Hi, everyone, We are fixing the mbean names in kafka-1482, by adding separate explicit tags in the name for things like clientId and topic. Another thing that some people have complained before is that we use quotes in the jmx name. Should we also just get rid of the quotes as part of kafka-1482? So, instead of "kafka.server":type="BrokerTopicMetrics",name="topic-1-BytesInPerSec" we will have kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1 Thanks, Jun -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How to produce and consume events in 2 DCs?
Hi Steven, That doesn't work. In your proposal mirrormaker in once DC would copy messages from topic A to the other DC in topic A. However, in the other DC there is a mirrormaker which does the same, creating a loop. Messages will be duplicated, triplicated, etc in a never ending loop. Mirroring to another topic would work (mirrormaker doesn't support that), and so would mirroring to another cluster. Neha's proposal would work also but I assume its a lot more work for the Kafka internals and therefor IMHO wouldn't meet the kiss principle. Kind regards, Erik. Steven Wu schreef op 22-10-14 om 01:48: I think it doesn't have to be two more clusters. can be just two more topics. MirrorMaker can copy from source topics in both regions into one aggregate topic. On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten < e.vanoos...@grons.nl.invalid> wrote: Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede het volgende geschreven: Another way to set up this kind of mirroring is by deploying 2 clusters in each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror maker copies data from both the DC's local clusters into the aggregate clusters. So if you want access to a topic with data from both DC's, you subscribe to the aggregate cluster. Thanks, Neha On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten < e.vanoos...@grons.nl.invalid> wrote: Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/ -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/
Re: How to produce and consume events in 2 DCs?
Thanks Neha, Unfortunately, the maintenance overhead of 2 more clusters is not acceptable to us. Would you accept a pull request on mirror maker that would rename topics on the fly? For example by accepting the parameter rename: —rename src1/dest1,src2/dest2 or, extended with RE support: —rename old_(.*)/new_\1 Kind regards, Erik. Op 20 okt. 2014, om 16:43 heeft Neha Narkhede het volgende geschreven: > Another way to set up this kind of mirroring is by deploying 2 clusters in > each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror > maker copies data from both the DC's local clusters into the aggregate > clusters. So if you want access to a topic with data from both DC's, you > subscribe to the aggregate cluster. > > Thanks, > Neha > > On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten < > e.vanoos...@grons.nl.invalid> wrote: > >> Hi, >> >> We have 2 data centers that produce events. Each DC has to process events >> from both DCs. >> >> I had the following in mind: >> >> DC 1 | DC 2 >>events |events >> + + + | + + + >> | | | | | | | >> v v v | v v v >> ++ | ++ >> | Receiver topic | | | Receiver topic | >> ++ ++ >> | | mirroring || >> | | +--+| >> | | | | >> | ++ | >> v vv v >> ++ | ++ >> | Consumer topic | | | Consumer topic | >> ++ | ++ >> + + + | + + + >> | | | | | | | >> v v v | v v v >> consumers | consumers >> >> >> As each DC has a single Kafka cluster, on each DC the receiver topic and >> consumer topic needs to be on the same cluster. >> Unfortunately, mirror maker does not seem to support mirroring to a topic >> with another name. >> >> Is there another tool we could use? >> Or, is there another approach for producing and consuming from 2 DCs? >> >> Kind regards, >>Erik. >> >> — >> Erik van Oosten >> http://www.day-to-day-stuff.blogspot.nl/ >> >>
How to produce and consume events in 2 DCs?
Hi, We have 2 data centers that produce events. Each DC has to process events from both DCs. I had the following in mind: DC 1 | DC 2 events |events + + + | + + + | | | | | | | v v v | v v v ++ | ++ | Receiver topic | | | Receiver topic | ++ ++ | | mirroring || | | +--+| | | | | | ++ | v vv v ++ | ++ | Consumer topic | | | Consumer topic | ++ | ++ + + + | + + + | | | | | | | v v v | v v v consumers | consumers As each DC has a single Kafka cluster, on each DC the receiver topic and consumer topic needs to be on the same cluster. Unfortunately, mirror maker does not seem to support mirroring to a topic with another name. Is there another tool we could use? Or, is there another approach for producing and consuming from 2 DCs? Kind regards, Erik. — Erik van Oosten http://www.day-to-day-stuff.blogspot.nl/