I've had concerns calling AdminClient.listTopics because on big clusters I've seen OOM because of too many TopicPartitions. On the other this problem already exists in the actual implementation because as Colin said Consumer is doing the same on client side. All in all this part is fine.
I've checked all the actual use-cases on Spark side which has to be covered and it looks doable. On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabh...@gmail.com> wrote: > So in overall, AdminClient covers the necessary to retrieve up-to-date > topic-partitions, whereas KIP-396 will cover the necessary to retrieve > offset (EARLIEST, LATEST, timestamp) on partition. > > Gabor, could you please add the input if I'm missing something? I'd like to > double-check on this. > > Assuming I'm not missing something, what would be preferred next action? > Personally I'd keep this as it is until KIP-396 passes the vote (the vote > for KIP-396 opened at January and it still doesn't pass - 7 months - which > worries me a bit if it's going to pass the vote or not), but I also respect > the lifecycle of KIP in Kafka community. > > On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote: > > > > > > > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> > wrote: > > > >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote: > >> > Thanks for the feedbacks Colin and Matthias. > >> > > >> > I agree with you regarding getting topics and partitions via > >> AdminClient, > >> > just curious how much the overhead would be. Would it be lighter, or > >> > heavier? We may not want to list topics in regular intervals - in plan > >> > phase we want to know up-to-date information so that the calculation > >> from > >> > Spark itself makes sense. > >> > >> It would be lighter. The consumer will periodically refresh metadata for > >> any topic you are subscribed to. AdminClient doesn’t have the concept of > >> subscriptions, and won’t refresh topic metadata until you request it. > >> > > > > Sounds great! Happy to hear about that. > > > > > >> > >> > > >> > On the other hands I'm not seeing any information regarding offset in > >> > current AdminClient, which is also one of reason we leverage consumer > >> and > >> > call poll(0). Colin, as you mentioned there're KIPs addressing this, > >> could > >> > you refer KIPs so that we can see whether it would work for our case? > >> > Without support of this we cannot replace our usage of consumer/poll > >> with > >> > AdminClient. > >> > >> KIP-396 is the one for listing offsets in AdminClient. > >> > > > > KIP-396 seems to fit to the needs on Spark's purpose to get offset > > information, even for timestamp. Thanks! > > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call, > > but not a big deal as it just requires two calls. > > > > > > >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` > which > >> > receives regex same as consumer subscription via pattern. We would > like > >> to > >> > provide same behavior what Kafka is basically providing as a source. > >> > >> We don’t have a regex listTopics at the moment, though we could add > this. > >> Currently, the regex is done on the client side anyway (although we’d > >> really like to change this in the future). So just listing everything > and > >> filtering locally would be the same performance and behavior as the > >> Consumer. > >> > > > > I see. Good to know regex is done on the client side - I've just searched > > some code and it applies filter for all topics retrieved from metadata > > fetch. Then it would be mostly no difference on this. Thanks for > confirming. > > > > > >> > >> best, > >> Colin > >> > >> > > >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax < > matth...@confluent.io> > >> > wrote: > >> > > >> > > Thanks for the details Jungtaek! > >> > > > >> > > I tend to agree with Colin, that using the AdminClient seems to be > the > >> > > better choice. > >> > > > >> > > You can get all topics via `listTopics()` (and you can refresh this > >> > > information on regular intervals) and match any pattern against the > >> list > >> > > of available topics in the driver. > >> > > > >> > > As you use `assignment()` and store offsets in the Spark checkpoint, > >> it > >> > > seems that using consumer group management is not a good fit for the > >> use > >> > > case. > >> > > > >> > > > >> > > Thoughts? > >> > > > >> > > > >> > > > >> > > -Matthias > >> > > > >> > > On 8/12/19 8:22 AM, Colin McCabe wrote: > >> > > > Hi, > >> > > > > >> > > > If there’s no need to consume records in the Spark driver, then > the > >> > > Consumer is probably the wrong thing to use. Instead, Spark should > use > >> > > AdminClient to find out what partitions exist and where, manage > their > >> > > offsets, and so on. There are some KIPs under discussion now that > >> would add > >> > > the necessary APIs for managing offsets. > >> > > > > >> > > > Best, > >> > > > Colin > >> > > > > >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: > >> > > >> My feeling is that I didn't explain the use case for Spark > >> properly and > >> > > >> hence fail to explain the needs. Sorry about this. > >> > > >> > >> > > >> Spark leverages the single instance of KafkaConsumer in the > driver > >> > > which is > >> > > >> registered solely on the consumer group. This is used in the plan > >> phase > >> > > for > >> > > >> each micro-batch to calculate the overall topicpartitions with > its > >> > > offset > >> > > >> ranges for this batch, and split and assign (topicpartition, > >> fromOffset, > >> > > >> untilOffset) to each input partition. After the planning is done > >> and > >> > > tasks > >> > > >> are being distributed to executors, consumer per each input > >> partition > >> > > will > >> > > >> be initialized from some executor (being assigned to the single > >> > > >> topicpartition), and pull the actual records. (Pooling consumers > is > >> > > applied > >> > > >> for sure.) As plan phase is to determine the overall > >> topicpartitions and > >> > > >> offset ranges to process, Spark is never interested on pulling > the > >> > > records > >> > > >> in driver side. > >> > > >> > >> > > >> Spark mainly leverages poll(0) to get the latest assigned > >> partitions and > >> > > >> adopt the changes or validate the expectation. That's not only > use > >> case > >> > > for > >> > > >> poll(0). Spark is also seeking the offset per topicpartition to > the > >> > > >> earliest or the latest, or specific one (either provided by end > >> user or > >> > > the > >> > > >> last committed offset) so that Spark can have actual offset or > >> validate > >> > > the > >> > > >> provided offset. According to the javadoc (if I understand > >> correctly), > >> > > to > >> > > >> get the offset immediately it seems to be required to call `poll` > >> or > >> > > >> `position`. > >> > > >> > >> > > >> The way Spark interacts with Kafka in this plan phase in driver > is > >> > > >> synchronous, as the phase should finish ASAP to run the next > phase. > >> > > >> Registering ConsumerRebalanceListener and tracking the change > will > >> > > require > >> > > >> some asynchronous handling which sounds to add unnecessary > >> complexity. > >> > > >> Spark may be OK with deal with synchronous with timeout (that's > >> what > >> > > >> methods in KafkaConsumer have been providing - they're not > >> > > asynchronous, at > >> > > >> least for callers) but dealing with asynchronous is another level > >> of > >> > > >> interest. I can see the benefit where continuous thread runs and > >> the > >> > > >> consumer is busy with something continuously, relying on listener > >> to > >> > > hear > >> > > >> the news on reassignment. Unfortunately that's not the case. > >> > > >> > >> > > >> Unit tests in Spark have similar needs: looks like Kafka test > code > >> also > >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in > many > >> > > places > >> > > >> as it's appropriate to the place which blocking (+timeout) call > is > >> > > >> preferred - so I can see the similar needs from here as well. > >> > > >> > >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < > >> > > gabor.g.somo...@gmail.com> > >> > > >> wrote: > >> > > >> > >> > > >>> Hi Guys, > >> > > >>> > >> > > >>> Please see the actual implementation, pretty sure it explains > the > >> > > situation > >> > > >>> well: > >> > > >>> > >> > > >>> > >> > > > >> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala > >> > > >>> > >> > > >>> To answer one question/assumption which popped up from all of > you > >> > > Spark not > >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe + > >> > > >>> KafkaConsumer#assign as well. > >> > > >>> Please see here: > >> > > >>> > >> > > >>> > >> > > > >> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala > >> > > >>> > >> > > >>> BR, > >> > > >>> G > >> > > >>> > >> > > >>> > >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < > >> > > satish.dugg...@gmail.com> > >> > > >>> wrote: > >> > > >>> > >> > > >>>> Hi Jungtaek, > >> > > >>>> Thanks for the KIP. I have a couple of questions here. > >> > > >>>> Is not Spark using Kafka's consumer group management across > >> multiple > >> > > >>>> consumers? > >> > > >>>> > >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, > >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics > >> for a > >> > > >>>> pattern based subscription and Spark manually assigns those > >> > > >>>> topic-partitions across consumers on workers? > >> > > >>>> > >> > > >>>> Thanks, > >> > > >>>> Satish. > >> > > >>>> > >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < > >> > > matth...@confluent.io> > >> > > >>>> wrote: > >> > > >>>> > >> > > >>>>> If am not sure if I fully understand yet. > >> > > >>>>> > >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as > >> part of > >> > > >>> its > >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am > >> missing > >> > > >>>>> something here. > >> > > >>>>> > >> > > >>>>> As you are using subscribe(), you use Kafka consumer group > >> mechanism, > >> > > >>>>> that takes care of the assignment of partitions to clients > >> within the > >> > > >>>>> group. Therefore, I am not sure what you mean by: > >> > > >>>>> > >> > > >>>>>> which Spark needs to > >> > > >>>>>>> know to coordinate multiple consumers to pull correctly. > >> > > >>>>> > >> > > >>>>> Multiple thoughts that may help: > >> > > >>>>> > >> > > >>>>> - if Spark needs more control about the partition assignment, > >> you can > >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer > >> > > >>>>> configuration) > >> > > >>>>> > >> > > >>>>> - you may also want to register `ConsumerRebalanceListener` > via > >> > > >>>>> `subscribe()` to get informed when the group rebalances > >> > > >>>>> > >> > > >>>>> As you pointed out, using pattern subscription metadata can > >> change if > >> > > >>>>> topic are added/deleted. However, each metadata change will > >> > > triggering > >> > > >>> a > >> > > >>>>> rebalance and thus you would get corresponding calls to you > >> rebalance > >> > > >>>>> listener to learn about it and react accordingly. > >> > > >>>>> > >> > > >>>>> Maybe you can explain why neither of both approaches works and > >> what > >> > > gap > >> > > >>>>> the new API would close? > >> > > >>>>> > >> > > >>>>> > >> > > >>>>> -Matthias > >> > > >>>>> > >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: > >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about > >> Apache > >> > > >>>>> Spark, > >> > > >>>>>> but this will apply for everything which want to control > >> offset of > >> > > >>>> Kafka > >> > > >>>>>> consumers. > >> > > >>>>>> > >> > > >>>>>> Spark is managing the committed offsets and the offsets which > >> should > >> > > >>> be > >> > > >>>>>> polled now. Topics and partitions as well. This is required > as > >> Spark > >> > > >>>>> itself > >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a > >> one of > >> > > >>>>>> source/sink (though it's considered as very important). > >> > > >>>>>> > >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which > >> topics and > >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but > >> as > >> > > >>> Spark > >> > > >>>>> can > >> > > >>>>>> also provide "patterns" of topics, as well as subscription > can > >> be > >> > > >>>> changed > >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which > >> Spark > >> > > >>> needs > >> > > >>>>> to > >> > > >>>>>> know to coordinate multiple consumers to pull correctly. > >> > > >>>>>> > >> > > >>>>>> Looks like assignment() doesn't update the assignment > >> information in > >> > > >>>>>> consumer. It just returns known one. There's only one known > >> approach > >> > > >>>>> doing > >> > > >>>>>> this, calling `poll`, but Spark is not interested on returned > >> > > >>> records, > >> > > >>>> so > >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the > >> API. > >> > > >>> This > >> > > >>>>> KIP > >> > > >>>>>> proposes to support this as official approach. > >> > > >>>>>> > >> > > >>>>>> > >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim < > >> kabh...@gmail.com> > >> > > >>>> wrote: > >> > > >>>>>> > >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well. > >> I'm in > >> > > >>>>> favor > >> > > >>>>>>> of describing it in this discussion thread so the discussion > >> itself > >> > > >>>> can > >> > > >>>>> go > >> > > >>>>>>> forward. So copying my answer here: > >> > > >>>>>>> > >> > > >>>>>>> We have some use case which we don't just rely on everything > >> what > >> > > >>>> Kafka > >> > > >>>>>>> consumer provides. We want to know current assignment on > this > >> > > >>>> consumer, > >> > > >>>>> and > >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`. > >> > > >>>>>>> > >> > > >>>>>>> That said, we don't want to pull any records here, and if > I'm > >> not > >> > > >>>>> missing > >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if > >> I'm > >> > > >>>> missing > >> > > >>>>>>> something. > >> > > >>>>>>> > >> > > >>>>>>> Thanks, > >> > > >>>>>>> Jungtaek Lim (HeartSaVioR) > >> > > >>>>>>> > >> > > >>>>>>> > >> > > >>>>>>> > >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > >> > > >>>> matth...@confluent.io> > >> > > >>>>>>> wrote: > >> > > >>>>>>> > >> > > >>>>>>>> Thanks for the KIP. > >> > > >>>>>>>> > >> > > >>>>>>>> Can you elaborate a little bit more on the use case for > this > >> > > >>> feature? > >> > > >>>>>>>> Why would a consumer need to update it's metadata > explicitly? > >> > > >>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>>> -Matthias > >> > > >>>>>>>> > >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > >> > > >>>>>>>>> Hi devs, > >> > > >>>>>>>>> > >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing > new > >> > > >>> public > >> > > >>>>>>>> method > >> > > >>>>>>>>> to only update assignment metadata in consumer. > >> > > >>>>>>>>> > >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it > >> doesn't > >> > > >>>>>>>> guarantee > >> > > >>>>>>>>> that it doesn't pull any records, and new method > >> `poll(Duration)` > >> > > >>>>>>>> doesn't > >> > > >>>>>>>>> have same semantic, so would like to propose new public > API > >> which > >> > > >>>> only > >> > > >>>>>>>> does > >> > > >>>>>>>>> the desired behavior. > >> > > >>>>>>>>> > >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > >> > > >>>>>>>>> > >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal, > >> as I'm > >> > > >>> new > >> > > >>>>> to > >> > > >>>>>>>>> Kafka community and may not catch preferences (like > >> > > >>> TimeoutException > >> > > >>>>> vs > >> > > >>>>>>>>> boolean, etc.) on Kafka project. > >> > > >>>>>>>>> > >> > > >>>>>>>>> Thanks in advance! > >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR) > >> > > >>>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>>> > >> > > >>>>>>> > >> > > >>>>>>> -- > >> > > >>>>>>> Name : Jungtaek Lim > >> > > >>>>>>> Blog : http://medium.com/@heartsavior > >> > > >>>>>>> Twitter : http://twitter.com/heartsavior > >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > >>>>>>> > >> > > >>>>>> > >> > > >>>>>> > >> > > >>>>> > >> > > >>>>> > >> > > >>>> > >> > > >>> > >> > > >> > >> > > >> > >> > > >> -- > >> > > >> Name : Jungtaek Lim > >> > > >> Blog : http://medium.com/@heartsavior > >> > > >> Twitter : http://twitter.com/heartsavior > >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > >> > >> > > > > >> > > > >> > > > >> > > >> > -- > >> > Name : Jungtaek Lim > >> > Blog : http://medium.com/@heartsavior > >> > Twitter : http://twitter.com/heartsavior > >> > LinkedIn : http://www.linkedin.com/in/heartsavior > >> > > >> > > > > > > -- > > Name : Jungtaek Lim > > Blog : http://medium.com/@heartsavior > > Twitter : http://twitter.com/heartsavior > > LinkedIn : http://www.linkedin.com/in/heartsavior > > > > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior >