That is a good point-- we should get KIP-396 voted on.  I will review it today.

best,
Colin


On Tue, Aug 13, 2019, at 05:58, Gabor Somogyi wrote:
> I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope
> others would consider it as a good solution...
> 
> G
> 
> 
> On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
> 
> > I've had concerns calling AdminClient.listTopics because on big clusters
> > I've seen OOM because of too many TopicPartitions.
> > On the other this problem already exists in the actual implementation
> > because as Colin said Consumer is doing the same on client side. All in all
> > this part is fine.
> >
> > I've checked all the actual use-cases on Spark side which has to be
> > covered and it looks doable.
> >
> >
> > On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabh...@gmail.com> wrote:
> >
> >> So in overall, AdminClient covers the necessary to retrieve up-to-date
> >> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
> >> offset (EARLIEST, LATEST, timestamp) on partition.
> >>
> >> Gabor, could you please add the input if I'm missing something? I'd like
> >> to
> >> double-check on this.
> >>
> >> Assuming I'm not missing something, what would be preferred next action?
> >> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
> >> for KIP-396 opened at January and it still doesn't pass - 7 months - which
> >> worries me a bit if it's going to pass the vote or not), but I also
> >> respect
> >> the lifecycle of KIP in Kafka community.
> >>
> >> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote:
> >>
> >> >
> >> >
> >> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org>
> >> wrote:
> >> >
> >> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
> >> >> > Thanks for the feedbacks Colin and Matthias.
> >> >> >
> >> >> > I agree with you regarding getting topics and partitions via
> >> >> AdminClient,
> >> >> > just curious how much the overhead would be. Would it be lighter, or
> >> >> > heavier? We may not want to list topics in regular intervals - in
> >> plan
> >> >> > phase we want to know up-to-date information so that the calculation
> >> >> from
> >> >> > Spark itself makes sense.
> >> >>
> >> >> It would be lighter. The consumer will periodically refresh metadata
> >> for
> >> >> any topic you are subscribed to. AdminClient doesn’t have the concept
> >> of
> >> >> subscriptions, and won’t refresh topic metadata until you request it.
> >> >>
> >> >
> >> > Sounds great! Happy to hear about that.
> >> >
> >> >
> >> >>
> >> >> >
> >> >> > On the other hands I'm not seeing any information regarding offset in
> >> >> > current AdminClient, which is also one of reason we leverage consumer
> >> >> and
> >> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> >> >> could
> >> >> > you refer KIPs so that we can see whether it would work for our case?
> >> >> > Without support of this we cannot replace our usage of consumer/poll
> >> >> with
> >> >> > AdminClient.
> >> >>
> >> >> KIP-396 is the one for listing offsets in AdminClient.
> >> >>
> >> >
> >> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
> >> > information, even for timestamp. Thanks!
> >> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
> >> > but not a big deal as it just requires two calls.
> >> >
> >> > >
> >> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
> >> which
> >> >> > receives regex same as consumer subscription via pattern. We would
> >> like
> >> >> to
> >> >> > provide same behavior what Kafka is basically providing as a source.
> >> >>
> >> >> We don’t have a regex listTopics at the moment, though we could add
> >> this.
> >> >> Currently, the regex is done on the client side anyway (although we’d
> >> >> really like to change this in the future). So just listing everything
> >> and
> >> >> filtering locally would be the same performance and behavior as the
> >> >> Consumer.
> >> >>
> >> >
> >> > I see. Good to know regex is done on the client side - I've just
> >> searched
> >> > some code and it applies filter for all topics retrieved from metadata
> >> > fetch. Then it would be mostly no difference on this. Thanks for
> >> confirming.
> >> >
> >> >
> >> >>
> >> >> best,
> >> >> Colin
> >> >>
> >> >> >
> >> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
> >> matth...@confluent.io>
> >> >> > wrote:
> >> >> >
> >> >> > > Thanks for the details Jungtaek!
> >> >> > >
> >> >> > > I tend to agree with Colin, that using the AdminClient seems to be
> >> the
> >> >> > > better choice.
> >> >> > >
> >> >> > > You can get all topics via `listTopics()` (and you can refresh this
> >> >> > > information on regular intervals) and match any pattern against the
> >> >> list
> >> >> > > of available topics in the driver.
> >> >> > >
> >> >> > > As you use `assignment()` and store offsets in the Spark
> >> checkpoint,
> >> >> it
> >> >> > > seems that using consumer group management is not a good fit for
> >> the
> >> >> use
> >> >> > > case.
> >> >> > >
> >> >> > >
> >> >> > > Thoughts?
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > -Matthias
> >> >> > >
> >> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
> >> >> > > > Hi,
> >> >> > > >
> >> >> > > > If there’s no need to consume records in the Spark driver, then
> >> the
> >> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
> >> use
> >> >> > > AdminClient to find out what partitions exist and where, manage
> >> their
> >> >> > > offsets, and so on. There are some KIPs under discussion now that
> >> >> would add
> >> >> > > the necessary APIs for managing offsets.
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Colin
> >> >> > > >
> >> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >> >> > > >> My feeling is that I didn't explain the use case for Spark
> >> >> properly and
> >> >> > > >> hence fail to explain the needs. Sorry about this.
> >> >> > > >>
> >> >> > > >> Spark leverages the single instance of KafkaConsumer in the
> >> driver
> >> >> > > which is
> >> >> > > >> registered solely on the consumer group. This is used in the
> >> plan
> >> >> phase
> >> >> > > for
> >> >> > > >> each micro-batch to calculate the overall topicpartitions with
> >> its
> >> >> > > offset
> >> >> > > >> ranges for this batch, and split and assign (topicpartition,
> >> >> fromOffset,
> >> >> > > >> untilOffset) to each input partition. After the planning is done
> >> >> and
> >> >> > > tasks
> >> >> > > >> are being distributed to executors, consumer per each input
> >> >> partition
> >> >> > > will
> >> >> > > >> be initialized from some executor (being assigned to the single
> >> >> > > >> topicpartition), and pull the actual records. (Pooling
> >> consumers is
> >> >> > > applied
> >> >> > > >> for sure.) As plan phase is to determine the overall
> >> >> topicpartitions and
> >> >> > > >> offset ranges to process, Spark is never interested on pulling
> >> the
> >> >> > > records
> >> >> > > >> in driver side.
> >> >> > > >>
> >> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
> >> >> partitions and
> >> >> > > >> adopt the changes or validate the expectation. That's not only
> >> use
> >> >> case
> >> >> > > for
> >> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to
> >> the
> >> >> > > >> earliest or the latest, or specific one (either provided by end
> >> >> user or
> >> >> > > the
> >> >> > > >> last committed offset) so that Spark can have actual offset or
> >> >> validate
> >> >> > > the
> >> >> > > >> provided offset. According to the javadoc (if I understand
> >> >> correctly),
> >> >> > > to
> >> >> > > >> get the offset immediately it seems to be required to call
> >> `poll`
> >> >> or
> >> >> > > >> `position`.
> >> >> > > >>
> >> >> > > >> The way Spark interacts with Kafka in this plan phase in driver
> >> is
> >> >> > > >> synchronous, as the phase should finish ASAP to run the next
> >> phase.
> >> >> > > >> Registering ConsumerRebalanceListener and tracking the change
> >> will
> >> >> > > require
> >> >> > > >> some asynchronous handling which sounds to add unnecessary
> >> >> complexity.
> >> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
> >> >> what
> >> >> > > >> methods in KafkaConsumer have been providing - they're not
> >> >> > > asynchronous, at
> >> >> > > >> least for callers) but dealing with asynchronous is another
> >> level
> >> >> of
> >> >> > > >> interest. I can see the benefit where continuous thread runs and
> >> >> the
> >> >> > > >> consumer is busy with something continuously, relying on
> >> listener
> >> >> to
> >> >> > > hear
> >> >> > > >> the news on reassignment. Unfortunately that's not the case.
> >> >> > > >>
> >> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
> >> code
> >> >> also
> >> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
> >> many
> >> >> > > places
> >> >> > > >> as it's appropriate to the place which blocking (+timeout) call
> >> is
> >> >> > > >> preferred - so I can see the similar needs from here as well.
> >> >> > > >>
> >> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> >> > > gabor.g.somo...@gmail.com>
> >> >> > > >> wrote:
> >> >> > > >>
> >> >> > > >>> Hi Guys,
> >> >> > > >>>
> >> >> > > >>> Please see the actual implementation, pretty sure it explains
> >> the
> >> >> > > situation
> >> >> > > >>> well:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >> >> > > >>>
> >> >> > > >>> To answer one question/assumption which popped up from all of
> >> you
> >> >> > > Spark not
> >> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >> >> > > >>> KafkaConsumer#assign as well.
> >> >> > > >>> Please see here:
> >> >> > > >>>
> >> >> > > >>>
> >> >> > >
> >> >>
> >> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >> >> > > >>>
> >> >> > > >>> BR,
> >> >> > > >>> G
> >> >> > > >>>
> >> >> > > >>>
> >> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> >> > > satish.dugg...@gmail.com>
> >> >> > > >>> wrote:
> >> >> > > >>>
> >> >> > > >>>> Hi Jungtaek,
> >> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
> >> >> > > >>>> Is not Spark using Kafka's consumer group management across
> >> >> multiple
> >> >> > > >>>> consumers?
> >> >> > > >>>>
> >> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
> >> >> for a
> >> >> > > >>>> pattern based subscription and Spark manually assigns those
> >> >> > > >>>> topic-partitions across consumers on workers?
> >> >> > > >>>>
> >> >> > > >>>> Thanks,
> >> >> > > >>>> Satish.
> >> >> > > >>>>
> >> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> >> > > matth...@confluent.io>
> >> >> > > >>>> wrote:
> >> >> > > >>>>
> >> >> > > >>>>> If am not sure if I fully understand yet.
> >> >> > > >>>>>
> >> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
> >> >> part of
> >> >> > > >>> its
> >> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
> >> >> missing
> >> >> > > >>>>> something here.
> >> >> > > >>>>>
> >> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
> >> >> mechanism,
> >> >> > > >>>>> that takes care of the assignment of partitions to clients
> >> >> within the
> >> >> > > >>>>> group. Therefore, I am not sure what you mean by:
> >> >> > > >>>>>
> >> >> > > >>>>>> which Spark needs to
> >> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>
> >> >> > > >>>>> Multiple thoughts that may help:
> >> >> > > >>>>>
> >> >> > > >>>>> - if Spark needs more control about the partition assignment,
> >> >> you can
> >> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the
> >> consumer
> >> >> > > >>>>> configuration)
> >> >> > > >>>>>
> >> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
> >> via
> >> >> > > >>>>> `subscribe()` to get informed when the group rebalances
> >> >> > > >>>>>
> >> >> > > >>>>> As you pointed out, using pattern subscription metadata can
> >> >> change if
> >> >> > > >>>>> topic are added/deleted. However, each metadata change will
> >> >> > > triggering
> >> >> > > >>> a
> >> >> > > >>>>> rebalance and thus you would get corresponding calls to you
> >> >> rebalance
> >> >> > > >>>>> listener to learn about it and react accordingly.
> >> >> > > >>>>>
> >> >> > > >>>>> Maybe you can explain why neither of both approaches works
> >> and
> >> >> what
> >> >> > > gap
> >> >> > > >>>>> the new API would close?
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>> -Matthias
> >> >> > > >>>>>
> >> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say
> >> about
> >> >> Apache
> >> >> > > >>>>> Spark,
> >> >> > > >>>>>> but this will apply for everything which want to control
> >> >> offset of
> >> >> > > >>>> Kafka
> >> >> > > >>>>>> consumers.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Spark is managing the committed offsets and the offsets
> >> which
> >> >> should
> >> >> > > >>> be
> >> >> > > >>>>>> polled now. Topics and partitions as well. This is required
> >> as
> >> >> Spark
> >> >> > > >>>>> itself
> >> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
> >> >> one of
> >> >> > > >>>>>> source/sink (though it's considered as very important).
> >> >> > > >>>>>>
> >> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
> >> >> topics and
> >> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll),
> >> but
> >> >> as
> >> >> > > >>> Spark
> >> >> > > >>>>> can
> >> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
> >> can
> >> >> be
> >> >> > > >>>> changed
> >> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
> >> >> Spark
> >> >> > > >>> needs
> >> >> > > >>>>> to
> >> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
> >> >> > > >>>>>>
> >> >> > > >>>>>> Looks like assignment() doesn't update the assignment
> >> >> information in
> >> >> > > >>>>>> consumer. It just returns known one. There's only one known
> >> >> approach
> >> >> > > >>>>> doing
> >> >> > > >>>>>> this, calling `poll`, but Spark is not interested on
> >> returned
> >> >> > > >>> records,
> >> >> > > >>>> so
> >> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated
> >> the
> >> >> API.
> >> >> > > >>> This
> >> >> > > >>>>> KIP
> >> >> > > >>>>>> proposes to support this as official approach.
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
> >> >> kabh...@gmail.com>
> >> >> > > >>>> wrote:
> >> >> > > >>>>>>
> >> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as
> >> well.
> >> >> I'm in
> >> >> > > >>>>> favor
> >> >> > > >>>>>>> of describing it in this discussion thread so the
> >> discussion
> >> >> itself
> >> >> > > >>>> can
> >> >> > > >>>>> go
> >> >> > > >>>>>>> forward. So copying my answer here:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> We have some use case which we don't just rely on
> >> everything
> >> >> what
> >> >> > > >>>> Kafka
> >> >> > > >>>>>>> consumer provides. We want to know current assignment on
> >> this
> >> >> > > >>>> consumer,
> >> >> > > >>>>> and
> >> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> That said, we don't want to pull any records here, and if
> >> I'm
> >> >> not
> >> >> > > >>>>> missing
> >> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
> >> >> I'm
> >> >> > > >>>> missing
> >> >> > > >>>>>>> something.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Thanks,
> >> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >> >> > > >>>> matth...@confluent.io>
> >> >> > > >>>>>>> wrote:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>> Thanks for the KIP.
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
> >> this
> >> >> > > >>> feature?
> >> >> > > >>>>>>>> Why would a consumer need to update it's metadata
> >> explicitly?
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> -Matthias
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >> >> > > >>>>>>>>> Hi devs,
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing
> >> new
> >> >> > > >>> public
> >> >> > > >>>>>>>> method
> >> >> > > >>>>>>>>> to only update assignment metadata in consumer.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
> >> >> doesn't
> >> >> > > >>>>>>>> guarantee
> >> >> > > >>>>>>>>> that it doesn't pull any records, and new method
> >> >> `poll(Duration)`
> >> >> > > >>>>>>>> doesn't
> >> >> > > >>>>>>>>> have same semantic, so would like to propose new public
> >> API
> >> >> which
> >> >> > > >>>> only
> >> >> > > >>>>>>>> does
> >> >> > > >>>>>>>>> the desired behavior.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
> >> >> as I'm
> >> >> > > >>> new
> >> >> > > >>>>> to
> >> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
> >> >> > > >>> TimeoutException
> >> >> > > >>>>> vs
> >> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Thanks in advance!
> >> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> --
> >> >> > > >>>>>>> Name : Jungtaek Lim
> >> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
> >> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
> >> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>
> >> >> > > >>>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> --
> >> >> > > >> Name : Jungtaek Lim
> >> >> > > >> Blog : http://medium.com/@heartsavior
> >> >> > > >> Twitter : http://twitter.com/heartsavior
> >> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> > > >>
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >> > --
> >> >> > Name : Jungtaek Lim
> >> >> > Blog : http://medium.com/@heartsavior
> >> >> > Twitter : http://twitter.com/heartsavior
> >> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > Name : Jungtaek Lim
> >> > Blog : http://medium.com/@heartsavior
> >> > Twitter : http://twitter.com/heartsavior
> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >> >
> >>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
>

Reply via email to