On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> Thanks for the feedbacks Colin and Matthias.
> 
> I agree with you regarding getting topics and partitions via AdminClient,
> just curious how much the overhead would be. Would it be lighter, or
> heavier? We may not want to list topics in regular intervals - in plan
> phase we want to know up-to-date information so that the calculation from
> Spark itself makes sense.

It would be lighter. The consumer will periodically refresh metadata for any 
topic you are subscribed to. AdminClient doesn’t have the concept of 
subscriptions, and won’t refresh topic metadata until you request it.

> 
> On the other hands I'm not seeing any information regarding offset in
> current AdminClient, which is also one of reason we leverage consumer and
> call poll(0). Colin, as you mentioned there're KIPs addressing this, could
> you refer KIPs so that we can see whether it would work for our case?
> Without support of this we cannot replace our usage of consumer/poll with
> AdminClient.

KIP-396 is the one for listing offsets in AdminClient.

> 
> ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> receives regex same as consumer subscription via pattern. We would like to
> provide same behavior what Kafka is basically providing as a source.

We don’t have a regex listTopics at the moment, though we could add this. 
Currently, the regex is done on the client side anyway (although we’d really 
like to change this in the future). So just listing everything and filtering 
locally would be the same performance and behavior as the Consumer.

best,
Colin

> 
> On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
> > Thanks for the details Jungtaek!
> >
> > I tend to agree with Colin, that using the AdminClient seems to be the
> > better choice.
> >
> > You can get all topics via `listTopics()` (and you can refresh this
> > information on regular intervals) and match any pattern against the list
> > of available topics in the driver.
> >
> > As you use `assignment()` and store offsets in the Spark checkpoint, it
> > seems that using consumer group management is not a good fit for the use
> > case.
> >
> >
> > Thoughts?
> >
> >
> >
> > -Matthias
> >
> > On 8/12/19 8:22 AM, Colin McCabe wrote:
> > > Hi,
> > >
> > > If there’s no need to consume records in the Spark driver, then the
> > Consumer is probably the wrong thing to use. Instead, Spark should use
> > AdminClient to find out what partitions exist and where, manage their
> > offsets, and so on. There are some KIPs under discussion now that would add
> > the necessary APIs for managing offsets.
> > >
> > > Best,
> > > Colin
> > >
> > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > >> My feeling is that I didn't explain the use case for Spark properly and
> > >> hence fail to explain the needs. Sorry about this.
> > >>
> > >> Spark leverages the single instance of KafkaConsumer in the driver
> > which is
> > >> registered solely on the consumer group. This is used in the plan phase
> > for
> > >> each micro-batch to calculate the overall topicpartitions with its
> > offset
> > >> ranges for this batch, and split and assign (topicpartition, fromOffset,
> > >> untilOffset) to each input partition. After the planning is done and
> > tasks
> > >> are being distributed to executors, consumer per each input partition
> > will
> > >> be initialized from some executor (being assigned to the single
> > >> topicpartition), and pull the actual records. (Pooling consumers is
> > applied
> > >> for sure.) As plan phase is to determine the overall topicpartitions and
> > >> offset ranges to process, Spark is never interested on pulling the
> > records
> > >> in driver side.
> > >>
> > >> Spark mainly leverages poll(0) to get the latest assigned partitions and
> > >> adopt the changes or validate the expectation. That's not only use case
> > for
> > >> poll(0). Spark is also seeking the offset per topicpartition to the
> > >> earliest or the latest, or specific one (either provided by end user or
> > the
> > >> last committed offset) so that Spark can have actual offset or validate
> > the
> > >> provided offset. According to the javadoc (if I understand correctly),
> > to
> > >> get the offset immediately it seems to be required to call `poll` or
> > >> `position`.
> > >>
> > >> The way Spark interacts with Kafka in this plan phase in driver is
> > >> synchronous, as the phase should finish ASAP to run the next phase.
> > >> Registering ConsumerRebalanceListener and tracking the change will
> > require
> > >> some asynchronous handling which sounds to add unnecessary complexity.
> > >> Spark may be OK with deal with synchronous with timeout (that's what
> > >> methods in KafkaConsumer have been providing - they're not
> > asynchronous, at
> > >> least for callers) but dealing with asynchronous is another level of
> > >> interest. I can see the benefit where continuous thread runs and the
> > >> consumer is busy with something continuously, relying on listener to
> > hear
> > >> the news on reassignment. Unfortunately that's not the case.
> > >>
> > >> Unit tests in Spark have similar needs: looks like Kafka test code also
> > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> > places
> > >> as it's appropriate to the place which blocking (+timeout) call is
> > >> preferred - so I can see the similar needs from here as well.
> > >>
> > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> > gabor.g.somo...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Guys,
> > >>>
> > >>> Please see the actual implementation, pretty sure it explains the
> > situation
> > >>> well:
> > >>>
> > >>>
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > >>>
> > >>> To answer one question/assumption which popped up from all of you
> > Spark not
> > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> > >>> KafkaConsumer#assign as well.
> > >>> Please see here:
> > >>>
> > >>>
> > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > >>>
> > >>> BR,
> > >>> G
> > >>>
> > >>>
> > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> > satish.dugg...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Jungtaek,
> > >>>> Thanks for the KIP. I have a couple of questions here.
> > >>>> Is not Spark using Kafka's consumer group management across multiple
> > >>>> consumers?
> > >>>>
> > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > >>>> ConsumerRebalanceListener listener) only to get all the topics for a
> > >>>> pattern based subscription and Spark manually assigns those
> > >>>> topic-partitions across consumers on workers?
> > >>>>
> > >>>> Thanks,
> > >>>> Satish.
> > >>>>
> > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> > matth...@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>>> If am not sure if I fully understand yet.
> > >>>>>
> > >>>>> The fact, that Spark does not stores offsets in Kafka but as part of
> > >>> its
> > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > >>>>> something here.
> > >>>>>
> > >>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
> > >>>>> that takes care of the assignment of partitions to clients within the
> > >>>>> group. Therefore, I am not sure what you mean by:
> > >>>>>
> > >>>>>> which Spark needs to
> > >>>>>>> know to coordinate multiple consumers to pull correctly.
> > >>>>>
> > >>>>> Multiple thoughts that may help:
> > >>>>>
> > >>>>> - if Spark needs more control about the partition assignment, you can
> > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> > >>>>> configuration)
> > >>>>>
> > >>>>> - you may also want to register `ConsumerRebalanceListener` via
> > >>>>> `subscribe()` to get informed when the group rebalances
> > >>>>>
> > >>>>> As you pointed out, using pattern subscription metadata can change if
> > >>>>> topic are added/deleted. However, each metadata change will
> > triggering
> > >>> a
> > >>>>> rebalance and thus you would get corresponding calls to you rebalance
> > >>>>> listener to learn about it and react accordingly.
> > >>>>>
> > >>>>> Maybe you can explain why neither of both approaches works and what
> > gap
> > >>>>> the new API would close?
> > >>>>>
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > >>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
> > >>>>> Spark,
> > >>>>>> but this will apply for everything which want to control offset of
> > >>>> Kafka
> > >>>>>> consumers.
> > >>>>>>
> > >>>>>> Spark is managing the committed offsets and the offsets which should
> > >>> be
> > >>>>>> polled now. Topics and partitions as well. This is required as Spark
> > >>>>> itself
> > >>>>>> has its own general checkpoint mechanism and Kafka is just a one of
> > >>>>>> source/sink (though it's considered as very important).
> > >>>>>>
> > >>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
> > >>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> > >>> Spark
> > >>>>> can
> > >>>>>> also provide "patterns" of topics, as well as subscription can be
> > >>>> changed
> > >>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
> > >>> needs
> > >>>>> to
> > >>>>>> know to coordinate multiple consumers to pull correctly.
> > >>>>>>
> > >>>>>> Looks like assignment() doesn't update the assignment information in
> > >>>>>> consumer. It just returns known one. There's only one known approach
> > >>>>> doing
> > >>>>>> this, calling `poll`, but Spark is not interested on returned
> > >>> records,
> > >>>> so
> > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> > >>> This
> > >>>>> KIP
> > >>>>>> proposes to support this as official approach.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
> > >>>>> favor
> > >>>>>>> of describing it in this discussion thread so the discussion itself
> > >>>> can
> > >>>>> go
> > >>>>>>> forward. So copying my answer here:
> > >>>>>>>
> > >>>>>>> We have some use case which we don't just rely on everything what
> > >>>> Kafka
> > >>>>>>> consumer provides. We want to know current assignment on this
> > >>>> consumer,
> > >>>>> and
> > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> > >>>>>>>
> > >>>>>>> That said, we don't want to pull any records here, and if I'm not
> > >>>>> missing
> > >>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> > >>>> missing
> > >>>>>>> something.
> > >>>>>>>
> > >>>>>>> Thanks,
> > >>>>>>> Jungtaek Lim (HeartSaVioR)
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > >>>> matth...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Thanks for the KIP.
> > >>>>>>>>
> > >>>>>>>> Can you elaborate a little bit more on the use case for this
> > >>> feature?
> > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > >>>>>>>>> Hi devs,
> > >>>>>>>>>
> > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> > >>> public
> > >>>>>>>> method
> > >>>>>>>>> to only update assignment metadata in consumer.
> > >>>>>>>>>
> > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > >>>>>>>> guarantee
> > >>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
> > >>>>>>>> doesn't
> > >>>>>>>>> have same semantic, so would like to propose new public API which
> > >>>> only
> > >>>>>>>> does
> > >>>>>>>>> the desired behavior.
> > >>>>>>>>>
> > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > >>>>>>>>>
> > >>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
> > >>> new
> > >>>>> to
> > >>>>>>>>> Kafka community and may not catch preferences (like
> > >>> TimeoutException
> > >>>>> vs
> > >>>>>>>>> boolean, etc.) on Kafka project.
> > >>>>>>>>>
> > >>>>>>>>> Thanks in advance!
> > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Name : Jungtaek Lim
> > >>>>>>> Blog : http://medium.com/@heartsavior
> > >>>>>>> Twitter : http://twitter.com/heartsavior
> > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> Name : Jungtaek Lim
> > >> Blog : http://medium.com/@heartsavior
> > >> Twitter : http://twitter.com/heartsavior
> > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>
> > >
> >
> >
> 
> -- 
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
> 

Reply via email to