On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> wrote:
> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote: > > Thanks for the feedbacks Colin and Matthias. > > > > I agree with you regarding getting topics and partitions via AdminClient, > > just curious how much the overhead would be. Would it be lighter, or > > heavier? We may not want to list topics in regular intervals - in plan > > phase we want to know up-to-date information so that the calculation from > > Spark itself makes sense. > > It would be lighter. The consumer will periodically refresh metadata for > any topic you are subscribed to. AdminClient doesn’t have the concept of > subscriptions, and won’t refresh topic metadata until you request it. > Sounds great! Happy to hear about that. > > > > > On the other hands I'm not seeing any information regarding offset in > > current AdminClient, which is also one of reason we leverage consumer and > > call poll(0). Colin, as you mentioned there're KIPs addressing this, > could > > you refer KIPs so that we can see whether it would work for our case? > > Without support of this we cannot replace our usage of consumer/poll with > > AdminClient. > > KIP-396 is the one for listing offsets in AdminClient. > KIP-396 seems to fit to the needs on Spark's purpose to get offset information, even for timestamp. Thanks! I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call, but not a big deal as it just requires two calls. > > > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which > > receives regex same as consumer subscription via pattern. We would like > to > > provide same behavior what Kafka is basically providing as a source. > > We don’t have a regex listTopics at the moment, though we could add this. > Currently, the regex is done on the client side anyway (although we’d > really like to change this in the future). So just listing everything and > filtering locally would be the same performance and behavior as the > Consumer. > I see. Good to know regex is done on the client side - I've just searched some code and it applies filter for all topics retrieved from metadata fetch. Then it would be mostly no difference on this. Thanks for confirming. > > best, > Colin > > > > > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thanks for the details Jungtaek! > > > > > > I tend to agree with Colin, that using the AdminClient seems to be the > > > better choice. > > > > > > You can get all topics via `listTopics()` (and you can refresh this > > > information on regular intervals) and match any pattern against the > list > > > of available topics in the driver. > > > > > > As you use `assignment()` and store offsets in the Spark checkpoint, it > > > seems that using consumer group management is not a good fit for the > use > > > case. > > > > > > > > > Thoughts? > > > > > > > > > > > > -Matthias > > > > > > On 8/12/19 8:22 AM, Colin McCabe wrote: > > > > Hi, > > > > > > > > If there’s no need to consume records in the Spark driver, then the > > > Consumer is probably the wrong thing to use. Instead, Spark should use > > > AdminClient to find out what partitions exist and where, manage their > > > offsets, and so on. There are some KIPs under discussion now that > would add > > > the necessary APIs for managing offsets. > > > > > > > > Best, > > > > Colin > > > > > > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: > > > >> My feeling is that I didn't explain the use case for Spark properly > and > > > >> hence fail to explain the needs. Sorry about this. > > > >> > > > >> Spark leverages the single instance of KafkaConsumer in the driver > > > which is > > > >> registered solely on the consumer group. This is used in the plan > phase > > > for > > > >> each micro-batch to calculate the overall topicpartitions with its > > > offset > > > >> ranges for this batch, and split and assign (topicpartition, > fromOffset, > > > >> untilOffset) to each input partition. After the planning is done and > > > tasks > > > >> are being distributed to executors, consumer per each input > partition > > > will > > > >> be initialized from some executor (being assigned to the single > > > >> topicpartition), and pull the actual records. (Pooling consumers is > > > applied > > > >> for sure.) As plan phase is to determine the overall > topicpartitions and > > > >> offset ranges to process, Spark is never interested on pulling the > > > records > > > >> in driver side. > > > >> > > > >> Spark mainly leverages poll(0) to get the latest assigned > partitions and > > > >> adopt the changes or validate the expectation. That's not only use > case > > > for > > > >> poll(0). Spark is also seeking the offset per topicpartition to the > > > >> earliest or the latest, or specific one (either provided by end > user or > > > the > > > >> last committed offset) so that Spark can have actual offset or > validate > > > the > > > >> provided offset. According to the javadoc (if I understand > correctly), > > > to > > > >> get the offset immediately it seems to be required to call `poll` or > > > >> `position`. > > > >> > > > >> The way Spark interacts with Kafka in this plan phase in driver is > > > >> synchronous, as the phase should finish ASAP to run the next phase. > > > >> Registering ConsumerRebalanceListener and tracking the change will > > > require > > > >> some asynchronous handling which sounds to add unnecessary > complexity. > > > >> Spark may be OK with deal with synchronous with timeout (that's what > > > >> methods in KafkaConsumer have been providing - they're not > > > asynchronous, at > > > >> least for callers) but dealing with asynchronous is another level of > > > >> interest. I can see the benefit where continuous thread runs and the > > > >> consumer is busy with something continuously, relying on listener to > > > hear > > > >> the news on reassignment. Unfortunately that's not the case. > > > >> > > > >> Unit tests in Spark have similar needs: looks like Kafka test code > also > > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many > > > places > > > >> as it's appropriate to the place which blocking (+timeout) call is > > > >> preferred - so I can see the similar needs from here as well. > > > >> > > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi < > > > gabor.g.somo...@gmail.com> > > > >> wrote: > > > >> > > > >>> Hi Guys, > > > >>> > > > >>> Please see the actual implementation, pretty sure it explains the > > > situation > > > >>> well: > > > >>> > > > >>> > > > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala > > > >>> > > > >>> To answer one question/assumption which popped up from all of you > > > Spark not > > > >>> only uses KafkaConsumer#subscribe but pattern subscribe + > > > >>> KafkaConsumer#assign as well. > > > >>> Please see here: > > > >>> > > > >>> > > > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala > > > >>> > > > >>> BR, > > > >>> G > > > >>> > > > >>> > > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < > > > satish.dugg...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Hi Jungtaek, > > > >>>> Thanks for the KIP. I have a couple of questions here. > > > >>>> Is not Spark using Kafka's consumer group management across > multiple > > > >>>> consumers? > > > >>>> > > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern, > > > >>>> ConsumerRebalanceListener listener) only to get all the topics > for a > > > >>>> pattern based subscription and Spark manually assigns those > > > >>>> topic-partitions across consumers on workers? > > > >>>> > > > >>>> Thanks, > > > >>>> Satish. > > > >>>> > > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < > > > matth...@confluent.io> > > > >>>> wrote: > > > >>>> > > > >>>>> If am not sure if I fully understand yet. > > > >>>>> > > > >>>>> The fact, that Spark does not stores offsets in Kafka but as > part of > > > >>> its > > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am > missing > > > >>>>> something here. > > > >>>>> > > > >>>>> As you are using subscribe(), you use Kafka consumer group > mechanism, > > > >>>>> that takes care of the assignment of partitions to clients > within the > > > >>>>> group. Therefore, I am not sure what you mean by: > > > >>>>> > > > >>>>>> which Spark needs to > > > >>>>>>> know to coordinate multiple consumers to pull correctly. > > > >>>>> > > > >>>>> Multiple thoughts that may help: > > > >>>>> > > > >>>>> - if Spark needs more control about the partition assignment, > you can > > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer > > > >>>>> configuration) > > > >>>>> > > > >>>>> - you may also want to register `ConsumerRebalanceListener` via > > > >>>>> `subscribe()` to get informed when the group rebalances > > > >>>>> > > > >>>>> As you pointed out, using pattern subscription metadata can > change if > > > >>>>> topic are added/deleted. However, each metadata change will > > > triggering > > > >>> a > > > >>>>> rebalance and thus you would get corresponding calls to you > rebalance > > > >>>>> listener to learn about it and react accordingly. > > > >>>>> > > > >>>>> Maybe you can explain why neither of both approaches works and > what > > > gap > > > >>>>> the new API would close? > > > >>>>> > > > >>>>> > > > >>>>> -Matthias > > > >>>>> > > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote: > > > >>>>>> Let me elaborate my explanation a bit more. Here we say about > Apache > > > >>>>> Spark, > > > >>>>>> but this will apply for everything which want to control offset > of > > > >>>> Kafka > > > >>>>>> consumers. > > > >>>>>> > > > >>>>>> Spark is managing the committed offsets and the offsets which > should > > > >>> be > > > >>>>>> polled now. Topics and partitions as well. This is required as > Spark > > > >>>>> itself > > > >>>>>> has its own general checkpoint mechanism and Kafka is just a > one of > > > >>>>>> source/sink (though it's considered as very important). > > > >>>>>> > > > >>>>>> To pull records from Kafka, Spark provides to Kafka which > topics and > > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but as > > > >>> Spark > > > >>>>> can > > > >>>>>> also provide "patterns" of topics, as well as subscription can > be > > > >>>> changed > > > >>>>>> in Kafka side (topic added/dropped, partitions added) which > Spark > > > >>> needs > > > >>>>> to > > > >>>>>> know to coordinate multiple consumers to pull correctly. > > > >>>>>> > > > >>>>>> Looks like assignment() doesn't update the assignment > information in > > > >>>>>> consumer. It just returns known one. There's only one known > approach > > > >>>>> doing > > > >>>>>> this, calling `poll`, but Spark is not interested on returned > > > >>> records, > > > >>>> so > > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the > API. > > > >>> This > > > >>>>> KIP > > > >>>>>> proposes to support this as official approach. > > > >>>>>> > > > >>>>>> > > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com > > > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Sorry I didn't recognize you're also asking it here as well. > I'm in > > > >>>>> favor > > > >>>>>>> of describing it in this discussion thread so the discussion > itself > > > >>>> can > > > >>>>> go > > > >>>>>>> forward. So copying my answer here: > > > >>>>>>> > > > >>>>>>> We have some use case which we don't just rely on everything > what > > > >>>> Kafka > > > >>>>>>> consumer provides. We want to know current assignment on this > > > >>>> consumer, > > > >>>>> and > > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`. > > > >>>>>>> > > > >>>>>>> That said, we don't want to pull any records here, and if I'm > not > > > >>>>> missing > > > >>>>>>> here, there's no way to accomplish this. Please guide me if I'm > > > >>>> missing > > > >>>>>>> something. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Jungtaek Lim (HeartSaVioR) > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > > > >>>> matth...@confluent.io> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Thanks for the KIP. > > > >>>>>>>> > > > >>>>>>>> Can you elaborate a little bit more on the use case for this > > > >>> feature? > > > >>>>>>>> Why would a consumer need to update it's metadata explicitly? > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -Matthias > > > >>>>>>>> > > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > > > >>>>>>>>> Hi devs, > > > >>>>>>>>> > > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new > > > >>> public > > > >>>>>>>> method > > > >>>>>>>>> to only update assignment metadata in consumer. > > > >>>>>>>>> > > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it > doesn't > > > >>>>>>>> guarantee > > > >>>>>>>>> that it doesn't pull any records, and new method > `poll(Duration)` > > > >>>>>>>> doesn't > > > >>>>>>>>> have same semantic, so would like to propose new public API > which > > > >>>> only > > > >>>>>>>> does > > > >>>>>>>>> the desired behavior. > > > >>>>>>>>> > > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > > > >>>>>>>>> > > > >>>>>>>>> Please feel free to suggest any improvements on proposal, as > I'm > > > >>> new > > > >>>>> to > > > >>>>>>>>> Kafka community and may not catch preferences (like > > > >>> TimeoutException > > > >>>>> vs > > > >>>>>>>>> boolean, etc.) on Kafka project. > > > >>>>>>>>> > > > >>>>>>>>> Thanks in advance! > > > >>>>>>>>> Jungtaek Lim (HeartSaVioR) > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> Name : Jungtaek Lim > > > >>>>>>> Blog : http://medium.com/@heartsavior > > > >>>>>>> Twitter : http://twitter.com/heartsavior > > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > >> > > > >> -- > > > >> Name : Jungtaek Lim > > > >> Blog : http://medium.com/@heartsavior > > > >> Twitter : http://twitter.com/heartsavior > > > >> LinkedIn : http://www.linkedin.com/in/heartsavior > > > >> > > > > > > > > > > > > > > -- > > Name : Jungtaek Lim > > Blog : http://medium.com/@heartsavior > > Twitter : http://twitter.com/heartsavior > > LinkedIn : http://www.linkedin.com/in/heartsavior > > > -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior