I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope others would consider it as a good solution...
G On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > I've had concerns calling AdminClient.listTopics because on big clusters > I've seen OOM because of too many TopicPartitions. > On the other this problem already exists in the actual implementation > because as Colin said Consumer is doing the same on client side. All in all > this part is fine. > > I've checked all the actual use-cases on Spark side which has to be > covered and it looks doable. > > > On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabh...@gmail.com> wrote: > >> So in overall, AdminClient covers the necessary to retrieve up-to-date >> topic-partitions, whereas KIP-396 will cover the necessary to retrieve >> offset (EARLIEST, LATEST, timestamp) on partition. >> >> Gabor, could you please add the input if I'm missing something? I'd like >> to >> double-check on this. >> >> Assuming I'm not missing something, what would be preferred next action? >> Personally I'd keep this as it is until KIP-396 passes the vote (the vote >> for KIP-396 opened at January and it still doesn't pass - 7 months - which >> worries me a bit if it's going to pass the vote or not), but I also >> respect >> the lifecycle of KIP in Kafka community. >> >> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote: >> >> > >> > >> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> >> wrote: >> > >> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote: >> >> > Thanks for the feedbacks Colin and Matthias. >> >> > >> >> > I agree with you regarding getting topics and partitions via >> >> AdminClient, >> >> > just curious how much the overhead would be. Would it be lighter, or >> >> > heavier? We may not want to list topics in regular intervals - in >> plan >> >> > phase we want to know up-to-date information so that the calculation >> >> from >> >> > Spark itself makes sense. >> >> >> >> It would be lighter. The consumer will periodically refresh metadata >> for >> >> any topic you are subscribed to. AdminClient doesn’t have the concept >> of >> >> subscriptions, and won’t refresh topic metadata until you request it. >> >> >> > >> > Sounds great! Happy to hear about that. >> > >> > >> >> >> >> > >> >> > On the other hands I'm not seeing any information regarding offset in >> >> > current AdminClient, which is also one of reason we leverage consumer >> >> and >> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this, >> >> could >> >> > you refer KIPs so that we can see whether it would work for our case? >> >> > Without support of this we cannot replace our usage of consumer/poll >> >> with >> >> > AdminClient. >> >> >> >> KIP-396 is the one for listing offsets in AdminClient. >> >> >> > >> > KIP-396 seems to fit to the needs on Spark's purpose to get offset >> > information, even for timestamp. Thanks! >> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call, >> > but not a big deal as it just requires two calls. >> > >> > > >> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` >> which >> >> > receives regex same as consumer subscription via pattern. We would >> like >> >> to >> >> > provide same behavior what Kafka is basically providing as a source. >> >> >> >> We don’t have a regex listTopics at the moment, though we could add >> this. >> >> Currently, the regex is done on the client side anyway (although we’d >> >> really like to change this in the future). So just listing everything >> and >> >> filtering locally would be the same performance and behavior as the >> >> Consumer. >> >> >> > >> > I see. Good to know regex is done on the client side - I've just >> searched >> > some code and it applies filter for all topics retrieved from metadata >> > fetch. Then it would be mostly no difference on this. Thanks for >> confirming. >> > >> > >> >> >> >> best, >> >> Colin >> >> >> >> > >> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax < >> matth...@confluent.io> >> >> > wrote: >> >> > >> >> > > Thanks for the details Jungtaek! >> >> > > >> >> > > I tend to agree with Colin, that using the AdminClient seems to be >> the >> >> > > better choice. >> >> > > >> >> > > You can get all topics via `listTopics()` (and you can refresh this >> >> > > information on regular intervals) and match any pattern against the >> >> list >> >> > > of available topics in the driver. >> >> > > >> >> > > As you use `assignment()` and store offsets in the Spark >> checkpoint, >> >> it >> >> > > seems that using consumer group management is not a good fit for >> the >> >> use >> >> > > case. >> >> > > >> >> > > >> >> > > Thoughts? >> >> > > >> >> > > >> >> > > >> >> > > -Matthias >> >> > > >> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote: >> >> > > > Hi, >> >> > > > >> >> > > > If there’s no need to consume records in the Spark driver, then >> the >> >> > > Consumer is probably the wrong thing to use. Instead, Spark should >> use >> >> > > AdminClient to find out what partitions exist and where, manage >> their >> >> > > offsets, and so on. There are some KIPs under discussion now that >> >> would add >> >> > > the necessary APIs for managing offsets. >> >> > > > >> >> > > > Best, >> >> > > > Colin >> >> > > > >> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: >> >> > > >> My feeling is that I didn't explain the use case for Spark >> >> properly and >> >> > > >> hence fail to explain the needs. Sorry about this. >> >> > > >> >> >> > > >> Spark leverages the single instance of KafkaConsumer in the >> driver >> >> > > which is >> >> > > >> registered solely on the consumer group. This is used in the >> plan >> >> phase >> >> > > for >> >> > > >> each micro-batch to calculate the overall topicpartitions with >> its >> >> > > offset >> >> > > >> ranges for this batch, and split and assign (topicpartition, >> >> fromOffset, >> >> > > >> untilOffset) to each input partition. After the planning is done >> >> and >> >> > > tasks >> >> > > >> are being distributed to executors, consumer per each input >> >> partition >> >> > > will >> >> > > >> be initialized from some executor (being assigned to the single >> >> > > >> topicpartition), and pull the actual records. (Pooling >> consumers is >> >> > > applied >> >> > > >> for sure.) As plan phase is to determine the overall >> >> topicpartitions and >> >> > > >> offset ranges to process, Spark is never interested on pulling >> the >> >> > > records >> >> > > >> in driver side. >> >> > > >> >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned >> >> partitions and >> >> > > >> adopt the changes or validate the expectation. That's not only >> use >> >> case >> >> > > for >> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to >> the >> >> > > >> earliest or the latest, or specific one (either provided by end >> >> user or >> >> > > the >> >> > > >> last committed offset) so that Spark can have actual offset or >> >> validate >> >> > > the >> >> > > >> provided offset. According to the javadoc (if I understand >> >> correctly), >> >> > > to >> >> > > >> get the offset immediately it seems to be required to call >> `poll` >> >> or >> >> > > >> `position`. >> >> > > >> >> >> > > >> The way Spark interacts with Kafka in this plan phase in driver >> is >> >> > > >> synchronous, as the phase should finish ASAP to run the next >> phase. >> >> > > >> Registering ConsumerRebalanceListener and tracking the change >> will >> >> > > require >> >> > > >> some asynchronous handling which sounds to add unnecessary >> >> complexity. >> >> > > >> Spark may be OK with deal with synchronous with timeout (that's >> >> what >> >> > > >> methods in KafkaConsumer have been providing - they're not >> >> > > asynchronous, at >> >> > > >> least for callers) but dealing with asynchronous is another >> level >> >> of >> >> > > >> interest. I can see the benefit where continuous thread runs and >> >> the >> >> > > >> consumer is busy with something continuously, relying on >> listener >> >> to >> >> > > hear >> >> > > >> the news on reassignment. Unfortunately that's not the case. >> >> > > >> >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test >> code >> >> also >> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in >> many >> >> > > places >> >> > > >> as it's appropriate to the place which blocking (+timeout) call >> is >> >> > > >> preferred - so I can see the similar needs from here as well. >> >> > > >> >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < >> >> > > gabor.g.somo...@gmail.com> >> >> > > >> wrote: >> >> > > >> >> >> > > >>> Hi Guys, >> >> > > >>> >> >> > > >>> Please see the actual implementation, pretty sure it explains >> the >> >> > > situation >> >> > > >>> well: >> >> > > >>> >> >> > > >>> >> >> > > >> >> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala >> >> > > >>> >> >> > > >>> To answer one question/assumption which popped up from all of >> you >> >> > > Spark not >> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe + >> >> > > >>> KafkaConsumer#assign as well. >> >> > > >>> Please see here: >> >> > > >>> >> >> > > >>> >> >> > > >> >> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala >> >> > > >>> >> >> > > >>> BR, >> >> > > >>> G >> >> > > >>> >> >> > > >>> >> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < >> >> > > satish.dugg...@gmail.com> >> >> > > >>> wrote: >> >> > > >>> >> >> > > >>>> Hi Jungtaek, >> >> > > >>>> Thanks for the KIP. I have a couple of questions here. >> >> > > >>>> Is not Spark using Kafka's consumer group management across >> >> multiple >> >> > > >>>> consumers? >> >> > > >>>> >> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, >> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics >> >> for a >> >> > > >>>> pattern based subscription and Spark manually assigns those >> >> > > >>>> topic-partitions across consumers on workers? >> >> > > >>>> >> >> > > >>>> Thanks, >> >> > > >>>> Satish. >> >> > > >>>> >> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < >> >> > > matth...@confluent.io> >> >> > > >>>> wrote: >> >> > > >>>> >> >> > > >>>>> If am not sure if I fully understand yet. >> >> > > >>>>> >> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as >> >> part of >> >> > > >>> its >> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am >> >> missing >> >> > > >>>>> something here. >> >> > > >>>>> >> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group >> >> mechanism, >> >> > > >>>>> that takes care of the assignment of partitions to clients >> >> within the >> >> > > >>>>> group. Therefore, I am not sure what you mean by: >> >> > > >>>>> >> >> > > >>>>>> which Spark needs to >> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly. >> >> > > >>>>> >> >> > > >>>>> Multiple thoughts that may help: >> >> > > >>>>> >> >> > > >>>>> - if Spark needs more control about the partition assignment, >> >> you can >> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the >> consumer >> >> > > >>>>> configuration) >> >> > > >>>>> >> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener` >> via >> >> > > >>>>> `subscribe()` to get informed when the group rebalances >> >> > > >>>>> >> >> > > >>>>> As you pointed out, using pattern subscription metadata can >> >> change if >> >> > > >>>>> topic are added/deleted. However, each metadata change will >> >> > > triggering >> >> > > >>> a >> >> > > >>>>> rebalance and thus you would get corresponding calls to you >> >> rebalance >> >> > > >>>>> listener to learn about it and react accordingly. >> >> > > >>>>> >> >> > > >>>>> Maybe you can explain why neither of both approaches works >> and >> >> what >> >> > > gap >> >> > > >>>>> the new API would close? >> >> > > >>>>> >> >> > > >>>>> >> >> > > >>>>> -Matthias >> >> > > >>>>> >> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: >> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say >> about >> >> Apache >> >> > > >>>>> Spark, >> >> > > >>>>>> but this will apply for everything which want to control >> >> offset of >> >> > > >>>> Kafka >> >> > > >>>>>> consumers. >> >> > > >>>>>> >> >> > > >>>>>> Spark is managing the committed offsets and the offsets >> which >> >> should >> >> > > >>> be >> >> > > >>>>>> polled now. Topics and partitions as well. This is required >> as >> >> Spark >> >> > > >>>>> itself >> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a >> >> one of >> >> > > >>>>>> source/sink (though it's considered as very important). >> >> > > >>>>>> >> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which >> >> topics and >> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), >> but >> >> as >> >> > > >>> Spark >> >> > > >>>>> can >> >> > > >>>>>> also provide "patterns" of topics, as well as subscription >> can >> >> be >> >> > > >>>> changed >> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which >> >> Spark >> >> > > >>> needs >> >> > > >>>>> to >> >> > > >>>>>> know to coordinate multiple consumers to pull correctly. >> >> > > >>>>>> >> >> > > >>>>>> Looks like assignment() doesn't update the assignment >> >> information in >> >> > > >>>>>> consumer. It just returns known one. There's only one known >> >> approach >> >> > > >>>>> doing >> >> > > >>>>>> this, calling `poll`, but Spark is not interested on >> returned >> >> > > >>> records, >> >> > > >>>> so >> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated >> the >> >> API. >> >> > > >>> This >> >> > > >>>>> KIP >> >> > > >>>>>> proposes to support this as official approach. >> >> > > >>>>>> >> >> > > >>>>>> >> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim < >> >> kabh...@gmail.com> >> >> > > >>>> wrote: >> >> > > >>>>>> >> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as >> well. >> >> I'm in >> >> > > >>>>> favor >> >> > > >>>>>>> of describing it in this discussion thread so the >> discussion >> >> itself >> >> > > >>>> can >> >> > > >>>>> go >> >> > > >>>>>>> forward. So copying my answer here: >> >> > > >>>>>>> >> >> > > >>>>>>> We have some use case which we don't just rely on >> everything >> >> what >> >> > > >>>> Kafka >> >> > > >>>>>>> consumer provides. We want to know current assignment on >> this >> >> > > >>>> consumer, >> >> > > >>>>> and >> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`. >> >> > > >>>>>>> >> >> > > >>>>>>> That said, we don't want to pull any records here, and if >> I'm >> >> not >> >> > > >>>>> missing >> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if >> >> I'm >> >> > > >>>> missing >> >> > > >>>>>>> something. >> >> > > >>>>>>> >> >> > > >>>>>>> Thanks, >> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR) >> >> > > >>>>>>> >> >> > > >>>>>>> >> >> > > >>>>>>> >> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < >> >> > > >>>> matth...@confluent.io> >> >> > > >>>>>>> wrote: >> >> > > >>>>>>> >> >> > > >>>>>>>> Thanks for the KIP. >> >> > > >>>>>>>> >> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for >> this >> >> > > >>> feature? >> >> > > >>>>>>>> Why would a consumer need to update it's metadata >> explicitly? >> >> > > >>>>>>>> >> >> > > >>>>>>>> >> >> > > >>>>>>>> -Matthias >> >> > > >>>>>>>> >> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: >> >> > > >>>>>>>>> Hi devs, >> >> > > >>>>>>>>> >> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing >> new >> >> > > >>> public >> >> > > >>>>>>>> method >> >> > > >>>>>>>>> to only update assignment metadata in consumer. >> >> > > >>>>>>>>> >> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it >> >> doesn't >> >> > > >>>>>>>> guarantee >> >> > > >>>>>>>>> that it doesn't pull any records, and new method >> >> `poll(Duration)` >> >> > > >>>>>>>> doesn't >> >> > > >>>>>>>>> have same semantic, so would like to propose new public >> API >> >> which >> >> > > >>>> only >> >> > > >>>>>>>> does >> >> > > >>>>>>>>> the desired behavior. >> >> > > >>>>>>>>> >> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw >> >> > > >>>>>>>>> >> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal, >> >> as I'm >> >> > > >>> new >> >> > > >>>>> to >> >> > > >>>>>>>>> Kafka community and may not catch preferences (like >> >> > > >>> TimeoutException >> >> > > >>>>> vs >> >> > > >>>>>>>>> boolean, etc.) on Kafka project. >> >> > > >>>>>>>>> >> >> > > >>>>>>>>> Thanks in advance! >> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR) >> >> > > >>>>>>>>> >> >> > > >>>>>>>> >> >> > > >>>>>>>> >> >> > > >>>>>>> >> >> > > >>>>>>> -- >> >> > > >>>>>>> Name : Jungtaek Lim >> >> > > >>>>>>> Blog : http://medium.com/@heartsavior >> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior >> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior >> >> > > >>>>>>> >> >> > > >>>>>> >> >> > > >>>>>> >> >> > > >>>>> >> >> > > >>>>> >> >> > > >>>> >> >> > > >>> >> >> > > >> >> >> > > >> >> >> > > >> -- >> >> > > >> Name : Jungtaek Lim >> >> > > >> Blog : http://medium.com/@heartsavior >> >> > > >> Twitter : http://twitter.com/heartsavior >> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior >> >> > > >> >> >> > > > >> >> > > >> >> > > >> >> > >> >> > -- >> >> > Name : Jungtaek Lim >> >> > Blog : http://medium.com/@heartsavior >> >> > Twitter : http://twitter.com/heartsavior >> >> > LinkedIn : http://www.linkedin.com/in/heartsavior >> >> > >> >> >> > >> > >> > -- >> > Name : Jungtaek Lim >> > Blog : http://medium.com/@heartsavior >> > Twitter : http://twitter.com/heartsavior >> > LinkedIn : http://www.linkedin.com/in/heartsavior >> > >> >> >> -- >> Name : Jungtaek Lim >> Blog : http://medium.com/@heartsavior >> Twitter : http://twitter.com/heartsavior >> LinkedIn : http://www.linkedin.com/in/heartsavior >> >