On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org> wrote:

> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> > Thanks for the feedbacks Colin and Matthias.
> >
> > I agree with you regarding getting topics and partitions via AdminClient,
> > just curious how much the overhead would be. Would it be lighter, or
> > heavier? We may not want to list topics in regular intervals - in plan
> > phase we want to know up-to-date information so that the calculation from
> > Spark itself makes sense.
>
> It would be lighter. The consumer will periodically refresh metadata for
> any topic you are subscribed to. AdminClient doesn’t have the concept of
> subscriptions, and won’t refresh topic metadata until you request it.
>

Sounds great! Happy to hear about that.


>
> >
> > On the other hands I'm not seeing any information regarding offset in
> > current AdminClient, which is also one of reason we leverage consumer and
> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> could
> > you refer KIPs so that we can see whether it would work for our case?
> > Without support of this we cannot replace our usage of consumer/poll with
> > AdminClient.
>
> KIP-396 is the one for listing offsets in AdminClient.
>

KIP-396 seems to fit to the needs on Spark's purpose to get offset
information, even for timestamp. Thanks!
I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
but not a big deal as it just requires two calls.

>
> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> > receives regex same as consumer subscription via pattern. We would like
> to
> > provide same behavior what Kafka is basically providing as a source.
>
> We don’t have a regex listTopics at the moment, though we could add this.
> Currently, the regex is done on the client side anyway (although we’d
> really like to change this in the future). So just listing everything and
> filtering locally would be the same performance and behavior as the
> Consumer.
>

I see. Good to know regex is done on the client side - I've just searched
some code and it applies filter for all topics retrieved from metadata
fetch. Then it would be mostly no difference on this. Thanks for confirming.


>
> best,
> Colin
>
> >
> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the details Jungtaek!
> > >
> > > I tend to agree with Colin, that using the AdminClient seems to be the
> > > better choice.
> > >
> > > You can get all topics via `listTopics()` (and you can refresh this
> > > information on regular intervals) and match any pattern against the
> list
> > > of available topics in the driver.
> > >
> > > As you use `assignment()` and store offsets in the Spark checkpoint, it
> > > seems that using consumer group management is not a good fit for the
> use
> > > case.
> > >
> > >
> > > Thoughts?
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> > > > Hi,
> > > >
> > > > If there’s no need to consume records in the Spark driver, then the
> > > Consumer is probably the wrong thing to use. Instead, Spark should use
> > > AdminClient to find out what partitions exist and where, manage their
> > > offsets, and so on. There are some KIPs under discussion now that
> would add
> > > the necessary APIs for managing offsets.
> > > >
> > > > Best,
> > > > Colin
> > > >
> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > > >> My feeling is that I didn't explain the use case for Spark properly
> and
> > > >> hence fail to explain the needs. Sorry about this.
> > > >>
> > > >> Spark leverages the single instance of KafkaConsumer in the driver
> > > which is
> > > >> registered solely on the consumer group. This is used in the plan
> phase
> > > for
> > > >> each micro-batch to calculate the overall topicpartitions with its
> > > offset
> > > >> ranges for this batch, and split and assign (topicpartition,
> fromOffset,
> > > >> untilOffset) to each input partition. After the planning is done and
> > > tasks
> > > >> are being distributed to executors, consumer per each input
> partition
> > > will
> > > >> be initialized from some executor (being assigned to the single
> > > >> topicpartition), and pull the actual records. (Pooling consumers is
> > > applied
> > > >> for sure.) As plan phase is to determine the overall
> topicpartitions and
> > > >> offset ranges to process, Spark is never interested on pulling the
> > > records
> > > >> in driver side.
> > > >>
> > > >> Spark mainly leverages poll(0) to get the latest assigned
> partitions and
> > > >> adopt the changes or validate the expectation. That's not only use
> case
> > > for
> > > >> poll(0). Spark is also seeking the offset per topicpartition to the
> > > >> earliest or the latest, or specific one (either provided by end
> user or
> > > the
> > > >> last committed offset) so that Spark can have actual offset or
> validate
> > > the
> > > >> provided offset. According to the javadoc (if I understand
> correctly),
> > > to
> > > >> get the offset immediately it seems to be required to call `poll` or
> > > >> `position`.
> > > >>
> > > >> The way Spark interacts with Kafka in this plan phase in driver is
> > > >> synchronous, as the phase should finish ASAP to run the next phase.
> > > >> Registering ConsumerRebalanceListener and tracking the change will
> > > require
> > > >> some asynchronous handling which sounds to add unnecessary
> complexity.
> > > >> Spark may be OK with deal with synchronous with timeout (that's what
> > > >> methods in KafkaConsumer have been providing - they're not
> > > asynchronous, at
> > > >> least for callers) but dealing with asynchronous is another level of
> > > >> interest. I can see the benefit where continuous thread runs and the
> > > >> consumer is busy with something continuously, relying on listener to
> > > hear
> > > >> the news on reassignment. Unfortunately that's not the case.
> > > >>
> > > >> Unit tests in Spark have similar needs: looks like Kafka test code
> also
> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> > > places
> > > >> as it's appropriate to the place which blocking (+timeout) call is
> > > >> preferred - so I can see the similar needs from here as well.
> > > >>
> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> > > gabor.g.somo...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Guys,
> > > >>>
> > > >>> Please see the actual implementation, pretty sure it explains the
> > > situation
> > > >>> well:
> > > >>>
> > > >>>
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > > >>>
> > > >>> To answer one question/assumption which popped up from all of you
> > > Spark not
> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> > > >>> KafkaConsumer#assign as well.
> > > >>> Please see here:
> > > >>>
> > > >>>
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > > >>>
> > > >>> BR,
> > > >>> G
> > > >>>
> > > >>>
> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> > > satish.dugg...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi Jungtaek,
> > > >>>> Thanks for the KIP. I have a couple of questions here.
> > > >>>> Is not Spark using Kafka's consumer group management across
> multiple
> > > >>>> consumers?
> > > >>>>
> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
> for a
> > > >>>> pattern based subscription and Spark manually assigns those
> > > >>>> topic-partitions across consumers on workers?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Satish.
> > > >>>>
> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> > > matth...@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> If am not sure if I fully understand yet.
> > > >>>>>
> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
> part of
> > > >>> its
> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> missing
> > > >>>>> something here.
> > > >>>>>
> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> mechanism,
> > > >>>>> that takes care of the assignment of partitions to clients
> within the
> > > >>>>> group. Therefore, I am not sure what you mean by:
> > > >>>>>
> > > >>>>>> which Spark needs to
> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> > > >>>>>
> > > >>>>> Multiple thoughts that may help:
> > > >>>>>
> > > >>>>> - if Spark needs more control about the partition assignment,
> you can
> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > >>>>> configuration)
> > > >>>>>
> > > >>>>> - you may also want to register `ConsumerRebalanceListener` via
> > > >>>>> `subscribe()` to get informed when the group rebalances
> > > >>>>>
> > > >>>>> As you pointed out, using pattern subscription metadata can
> change if
> > > >>>>> topic are added/deleted. However, each metadata change will
> > > triggering
> > > >>> a
> > > >>>>> rebalance and thus you would get corresponding calls to you
> rebalance
> > > >>>>> listener to learn about it and react accordingly.
> > > >>>>>
> > > >>>>> Maybe you can explain why neither of both approaches works and
> what
> > > gap
> > > >>>>> the new API would close?
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > >>>>>> Let me elaborate my explanation a bit more. Here we say about
> Apache
> > > >>>>> Spark,
> > > >>>>>> but this will apply for everything which want to control offset
> of
> > > >>>> Kafka
> > > >>>>>> consumers.
> > > >>>>>>
> > > >>>>>> Spark is managing the committed offsets and the offsets which
> should
> > > >>> be
> > > >>>>>> polled now. Topics and partitions as well. This is required as
> Spark
> > > >>>>> itself
> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
> one of
> > > >>>>>> source/sink (though it's considered as very important).
> > > >>>>>>
> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> topics and
> > > >>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> > > >>> Spark
> > > >>>>> can
> > > >>>>>> also provide "patterns" of topics, as well as subscription can
> be
> > > >>>> changed
> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
> Spark
> > > >>> needs
> > > >>>>> to
> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> > > >>>>>>
> > > >>>>>> Looks like assignment() doesn't update the assignment
> information in
> > > >>>>>> consumer. It just returns known one. There's only one known
> approach
> > > >>>>> doing
> > > >>>>>> this, calling `poll`, but Spark is not interested on returned
> > > >>> records,
> > > >>>> so
> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the
> API.
> > > >>> This
> > > >>>>> KIP
> > > >>>>>> proposes to support this as official approach.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com
> >
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Sorry I didn't recognize you're also asking it here as well.
> I'm in
> > > >>>>> favor
> > > >>>>>>> of describing it in this discussion thread so the discussion
> itself
> > > >>>> can
> > > >>>>> go
> > > >>>>>>> forward. So copying my answer here:
> > > >>>>>>>
> > > >>>>>>> We have some use case which we don't just rely on everything
> what
> > > >>>> Kafka
> > > >>>>>>> consumer provides. We want to know current assignment on this
> > > >>>> consumer,
> > > >>>>> and
> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> > > >>>>>>>
> > > >>>>>>> That said, we don't want to pull any records here, and if I'm
> not
> > > >>>>> missing
> > > >>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> > > >>>> missing
> > > >>>>>>> something.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > > >>>> matth...@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Thanks for the KIP.
> > > >>>>>>>>
> > > >>>>>>>> Can you elaborate a little bit more on the use case for this
> > > >>> feature?
> > > >>>>>>>> Why would a consumer need to update it's metadata explicitly?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > >>>>>>>>> Hi devs,
> > > >>>>>>>>>
> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> > > >>> public
> > > >>>>>>>> method
> > > >>>>>>>>> to only update assignment metadata in consumer.
> > > >>>>>>>>>
> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> doesn't
> > > >>>>>>>> guarantee
> > > >>>>>>>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> > > >>>>>>>> doesn't
> > > >>>>>>>>> have same semantic, so would like to propose new public API
> which
> > > >>>> only
> > > >>>>>>>> does
> > > >>>>>>>>> the desired behavior.
> > > >>>>>>>>>
> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > >>>>>>>>>
> > > >>>>>>>>> Please feel free to suggest any improvements on proposal, as
> I'm
> > > >>> new
> > > >>>>> to
> > > >>>>>>>>> Kafka community and may not catch preferences (like
> > > >>> TimeoutException
> > > >>>>> vs
> > > >>>>>>>>> boolean, etc.) on Kafka project.
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks in advance!
> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> Name : Jungtaek Lim
> > > >>>>>>> Blog : http://medium.com/@heartsavior
> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> Name : Jungtaek Lim
> > > >> Blog : http://medium.com/@heartsavior
> > > >> Twitter : http://twitter.com/heartsavior
> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > >>
> > > >
> > >
> > >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to