I've had a look on KIP-396 and until now only 1 binding vote arrived. Hope
others would consider it as a good solution...

G


On Tue, Aug 13, 2019 at 11:52 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> I've had concerns calling AdminClient.listTopics because on big clusters
> I've seen OOM because of too many TopicPartitions.
> On the other this problem already exists in the actual implementation
> because as Colin said Consumer is doing the same on client side. All in all
> this part is fine.
>
> I've checked all the actual use-cases on Spark side which has to be
> covered and it looks doable.
>
>
> On Tue, Aug 13, 2019 at 6:01 AM Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> So in overall, AdminClient covers the necessary to retrieve up-to-date
>> topic-partitions, whereas KIP-396 will cover the necessary to retrieve
>> offset (EARLIEST, LATEST, timestamp) on partition.
>>
>> Gabor, could you please add the input if I'm missing something? I'd like
>> to
>> double-check on this.
>>
>> Assuming I'm not missing something, what would be preferred next action?
>> Personally I'd keep this as it is until KIP-396 passes the vote (the vote
>> for KIP-396 opened at January and it still doesn't pass - 7 months - which
>> worries me a bit if it's going to pass the vote or not), but I also
>> respect
>> the lifecycle of KIP in Kafka community.
>>
>> On Tue, Aug 13, 2019 at 12:46 PM Jungtaek Lim <kabh...@gmail.com> wrote:
>>
>> >
>> >
>> > On Tue, Aug 13, 2019 at 10:01 AM Colin McCabe <cmcc...@apache.org>
>> wrote:
>> >
>> >> On Mon, Aug 12, 2019, at 14:54, Jungtaek Lim wrote:
>> >> > Thanks for the feedbacks Colin and Matthias.
>> >> >
>> >> > I agree with you regarding getting topics and partitions via
>> >> AdminClient,
>> >> > just curious how much the overhead would be. Would it be lighter, or
>> >> > heavier? We may not want to list topics in regular intervals - in
>> plan
>> >> > phase we want to know up-to-date information so that the calculation
>> >> from
>> >> > Spark itself makes sense.
>> >>
>> >> It would be lighter. The consumer will periodically refresh metadata
>> for
>> >> any topic you are subscribed to. AdminClient doesn’t have the concept
>> of
>> >> subscriptions, and won’t refresh topic metadata until you request it.
>> >>
>> >
>> > Sounds great! Happy to hear about that.
>> >
>> >
>> >>
>> >> >
>> >> > On the other hands I'm not seeing any information regarding offset in
>> >> > current AdminClient, which is also one of reason we leverage consumer
>> >> and
>> >> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
>> >> could
>> >> > you refer KIPs so that we can see whether it would work for our case?
>> >> > Without support of this we cannot replace our usage of consumer/poll
>> >> with
>> >> > AdminClient.
>> >>
>> >> KIP-396 is the one for listing offsets in AdminClient.
>> >>
>> >
>> > KIP-396 seems to fit to the needs on Spark's purpose to get offset
>> > information, even for timestamp. Thanks!
>> > I'd wish there's a way to get a range of (EARLIEST, LATEST) in one call,
>> > but not a big deal as it just requires two calls.
>> >
>> > >
>> >> > ps. IMHO it seems to be helpful if there's overloaded `listTopics`
>> which
>> >> > receives regex same as consumer subscription via pattern. We would
>> like
>> >> to
>> >> > provide same behavior what Kafka is basically providing as a source.
>> >>
>> >> We don’t have a regex listTopics at the moment, though we could add
>> this.
>> >> Currently, the regex is done on the client side anyway (although we’d
>> >> really like to change this in the future). So just listing everything
>> and
>> >> filtering locally would be the same performance and behavior as the
>> >> Consumer.
>> >>
>> >
>> > I see. Good to know regex is done on the client side - I've just
>> searched
>> > some code and it applies filter for all topics retrieved from metadata
>> > fetch. Then it would be mostly no difference on this. Thanks for
>> confirming.
>> >
>> >
>> >>
>> >> best,
>> >> Colin
>> >>
>> >> >
>> >> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <
>> matth...@confluent.io>
>> >> > wrote:
>> >> >
>> >> > > Thanks for the details Jungtaek!
>> >> > >
>> >> > > I tend to agree with Colin, that using the AdminClient seems to be
>> the
>> >> > > better choice.
>> >> > >
>> >> > > You can get all topics via `listTopics()` (and you can refresh this
>> >> > > information on regular intervals) and match any pattern against the
>> >> list
>> >> > > of available topics in the driver.
>> >> > >
>> >> > > As you use `assignment()` and store offsets in the Spark
>> checkpoint,
>> >> it
>> >> > > seems that using consumer group management is not a good fit for
>> the
>> >> use
>> >> > > case.
>> >> > >
>> >> > >
>> >> > > Thoughts?
>> >> > >
>> >> > >
>> >> > >
>> >> > > -Matthias
>> >> > >
>> >> > > On 8/12/19 8:22 AM, Colin McCabe wrote:
>> >> > > > Hi,
>> >> > > >
>> >> > > > If there’s no need to consume records in the Spark driver, then
>> the
>> >> > > Consumer is probably the wrong thing to use. Instead, Spark should
>> use
>> >> > > AdminClient to find out what partitions exist and where, manage
>> their
>> >> > > offsets, and so on. There are some KIPs under discussion now that
>> >> would add
>> >> > > the necessary APIs for managing offsets.
>> >> > > >
>> >> > > > Best,
>> >> > > > Colin
>> >> > > >
>> >> > > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> >> > > >> My feeling is that I didn't explain the use case for Spark
>> >> properly and
>> >> > > >> hence fail to explain the needs. Sorry about this.
>> >> > > >>
>> >> > > >> Spark leverages the single instance of KafkaConsumer in the
>> driver
>> >> > > which is
>> >> > > >> registered solely on the consumer group. This is used in the
>> plan
>> >> phase
>> >> > > for
>> >> > > >> each micro-batch to calculate the overall topicpartitions with
>> its
>> >> > > offset
>> >> > > >> ranges for this batch, and split and assign (topicpartition,
>> >> fromOffset,
>> >> > > >> untilOffset) to each input partition. After the planning is done
>> >> and
>> >> > > tasks
>> >> > > >> are being distributed to executors, consumer per each input
>> >> partition
>> >> > > will
>> >> > > >> be initialized from some executor (being assigned to the single
>> >> > > >> topicpartition), and pull the actual records. (Pooling
>> consumers is
>> >> > > applied
>> >> > > >> for sure.) As plan phase is to determine the overall
>> >> topicpartitions and
>> >> > > >> offset ranges to process, Spark is never interested on pulling
>> the
>> >> > > records
>> >> > > >> in driver side.
>> >> > > >>
>> >> > > >> Spark mainly leverages poll(0) to get the latest assigned
>> >> partitions and
>> >> > > >> adopt the changes or validate the expectation. That's not only
>> use
>> >> case
>> >> > > for
>> >> > > >> poll(0). Spark is also seeking the offset per topicpartition to
>> the
>> >> > > >> earliest or the latest, or specific one (either provided by end
>> >> user or
>> >> > > the
>> >> > > >> last committed offset) so that Spark can have actual offset or
>> >> validate
>> >> > > the
>> >> > > >> provided offset. According to the javadoc (if I understand
>> >> correctly),
>> >> > > to
>> >> > > >> get the offset immediately it seems to be required to call
>> `poll`
>> >> or
>> >> > > >> `position`.
>> >> > > >>
>> >> > > >> The way Spark interacts with Kafka in this plan phase in driver
>> is
>> >> > > >> synchronous, as the phase should finish ASAP to run the next
>> phase.
>> >> > > >> Registering ConsumerRebalanceListener and tracking the change
>> will
>> >> > > require
>> >> > > >> some asynchronous handling which sounds to add unnecessary
>> >> complexity.
>> >> > > >> Spark may be OK with deal with synchronous with timeout (that's
>> >> what
>> >> > > >> methods in KafkaConsumer have been providing - they're not
>> >> > > asynchronous, at
>> >> > > >> least for callers) but dealing with asynchronous is another
>> level
>> >> of
>> >> > > >> interest. I can see the benefit where continuous thread runs and
>> >> the
>> >> > > >> consumer is busy with something continuously, relying on
>> listener
>> >> to
>> >> > > hear
>> >> > > >> the news on reassignment. Unfortunately that's not the case.
>> >> > > >>
>> >> > > >> Unit tests in Spark have similar needs: looks like Kafka test
>> code
>> >> also
>> >> > > >> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in
>> many
>> >> > > places
>> >> > > >> as it's appropriate to the place which blocking (+timeout) call
>> is
>> >> > > >> preferred - so I can see the similar needs from here as well.
>> >> > > >>
>> >> > > >> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
>> >> > > gabor.g.somo...@gmail.com>
>> >> > > >> wrote:
>> >> > > >>
>> >> > > >>> Hi Guys,
>> >> > > >>>
>> >> > > >>> Please see the actual implementation, pretty sure it explains
>> the
>> >> > > situation
>> >> > > >>> well:
>> >> > > >>>
>> >> > > >>>
>> >> > >
>> >>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>> >> > > >>>
>> >> > > >>> To answer one question/assumption which popped up from all of
>> you
>> >> > > Spark not
>> >> > > >>> only uses KafkaConsumer#subscribe but pattern subscribe +
>> >> > > >>> KafkaConsumer#assign as well.
>> >> > > >>> Please see here:
>> >> > > >>>
>> >> > > >>>
>> >> > >
>> >>
>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>> >> > > >>>
>> >> > > >>> BR,
>> >> > > >>> G
>> >> > > >>>
>> >> > > >>>
>> >> > > >>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
>> >> > > satish.dugg...@gmail.com>
>> >> > > >>> wrote:
>> >> > > >>>
>> >> > > >>>> Hi Jungtaek,
>> >> > > >>>> Thanks for the KIP. I have a couple of questions here.
>> >> > > >>>> Is not Spark using Kafka's consumer group management across
>> >> multiple
>> >> > > >>>> consumers?
>> >> > > >>>>
>> >> > > >>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>> >> > > >>>> ConsumerRebalanceListener listener) only to get all the topics
>> >> for a
>> >> > > >>>> pattern based subscription and Spark manually assigns those
>> >> > > >>>> topic-partitions across consumers on workers?
>> >> > > >>>>
>> >> > > >>>> Thanks,
>> >> > > >>>> Satish.
>> >> > > >>>>
>> >> > > >>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
>> >> > > matth...@confluent.io>
>> >> > > >>>> wrote:
>> >> > > >>>>
>> >> > > >>>>> If am not sure if I fully understand yet.
>> >> > > >>>>>
>> >> > > >>>>> The fact, that Spark does not stores offsets in Kafka but as
>> >> part of
>> >> > > >>> its
>> >> > > >>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am
>> >> missing
>> >> > > >>>>> something here.
>> >> > > >>>>>
>> >> > > >>>>> As you are using subscribe(), you use Kafka consumer group
>> >> mechanism,
>> >> > > >>>>> that takes care of the assignment of partitions to clients
>> >> within the
>> >> > > >>>>> group. Therefore, I am not sure what you mean by:
>> >> > > >>>>>
>> >> > > >>>>>> which Spark needs to
>> >> > > >>>>>>> know to coordinate multiple consumers to pull correctly.
>> >> > > >>>>>
>> >> > > >>>>> Multiple thoughts that may help:
>> >> > > >>>>>
>> >> > > >>>>> - if Spark needs more control about the partition assignment,
>> >> you can
>> >> > > >>>>> provide a custom `ConsumerPartitionAssignor` (via the
>> consumer
>> >> > > >>>>> configuration)
>> >> > > >>>>>
>> >> > > >>>>> - you may also want to register `ConsumerRebalanceListener`
>> via
>> >> > > >>>>> `subscribe()` to get informed when the group rebalances
>> >> > > >>>>>
>> >> > > >>>>> As you pointed out, using pattern subscription metadata can
>> >> change if
>> >> > > >>>>> topic are added/deleted. However, each metadata change will
>> >> > > triggering
>> >> > > >>> a
>> >> > > >>>>> rebalance and thus you would get corresponding calls to you
>> >> rebalance
>> >> > > >>>>> listener to learn about it and react accordingly.
>> >> > > >>>>>
>> >> > > >>>>> Maybe you can explain why neither of both approaches works
>> and
>> >> what
>> >> > > gap
>> >> > > >>>>> the new API would close?
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>> -Matthias
>> >> > > >>>>>
>> >> > > >>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>> >> > > >>>>>> Let me elaborate my explanation a bit more. Here we say
>> about
>> >> Apache
>> >> > > >>>>> Spark,
>> >> > > >>>>>> but this will apply for everything which want to control
>> >> offset of
>> >> > > >>>> Kafka
>> >> > > >>>>>> consumers.
>> >> > > >>>>>>
>> >> > > >>>>>> Spark is managing the committed offsets and the offsets
>> which
>> >> should
>> >> > > >>> be
>> >> > > >>>>>> polled now. Topics and partitions as well. This is required
>> as
>> >> Spark
>> >> > > >>>>> itself
>> >> > > >>>>>> has its own general checkpoint mechanism and Kafka is just a
>> >> one of
>> >> > > >>>>>> source/sink (though it's considered as very important).
>> >> > > >>>>>>
>> >> > > >>>>>> To pull records from Kafka, Spark provides to Kafka which
>> >> topics and
>> >> > > >>>>>> partitions it wants to subscribe(, and do seek and poll),
>> but
>> >> as
>> >> > > >>> Spark
>> >> > > >>>>> can
>> >> > > >>>>>> also provide "patterns" of topics, as well as subscription
>> can
>> >> be
>> >> > > >>>> changed
>> >> > > >>>>>> in Kafka side (topic added/dropped, partitions added) which
>> >> Spark
>> >> > > >>> needs
>> >> > > >>>>> to
>> >> > > >>>>>> know to coordinate multiple consumers to pull correctly.
>> >> > > >>>>>>
>> >> > > >>>>>> Looks like assignment() doesn't update the assignment
>> >> information in
>> >> > > >>>>>> consumer. It just returns known one. There's only one known
>> >> approach
>> >> > > >>>>> doing
>> >> > > >>>>>> this, calling `poll`, but Spark is not interested on
>> returned
>> >> > > >>> records,
>> >> > > >>>> so
>> >> > > >>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated
>> the
>> >> API.
>> >> > > >>> This
>> >> > > >>>>> KIP
>> >> > > >>>>>> proposes to support this as official approach.
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <
>> >> kabh...@gmail.com>
>> >> > > >>>> wrote:
>> >> > > >>>>>>
>> >> > > >>>>>>> Sorry I didn't recognize you're also asking it here as
>> well.
>> >> I'm in
>> >> > > >>>>> favor
>> >> > > >>>>>>> of describing it in this discussion thread so the
>> discussion
>> >> itself
>> >> > > >>>> can
>> >> > > >>>>> go
>> >> > > >>>>>>> forward. So copying my answer here:
>> >> > > >>>>>>>
>> >> > > >>>>>>> We have some use case which we don't just rely on
>> everything
>> >> what
>> >> > > >>>> Kafka
>> >> > > >>>>>>> consumer provides. We want to know current assignment on
>> this
>> >> > > >>>> consumer,
>> >> > > >>>>> and
>> >> > > >>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>> >> > > >>>>>>>
>> >> > > >>>>>>> That said, we don't want to pull any records here, and if
>> I'm
>> >> not
>> >> > > >>>>> missing
>> >> > > >>>>>>> here, there's no way to accomplish this. Please guide me if
>> >> I'm
>> >> > > >>>> missing
>> >> > > >>>>>>> something.
>> >> > > >>>>>>>
>> >> > > >>>>>>> Thanks,
>> >> > > >>>>>>> Jungtaek Lim (HeartSaVioR)
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>> >> > > >>>> matth...@confluent.io>
>> >> > > >>>>>>> wrote:
>> >> > > >>>>>>>
>> >> > > >>>>>>>> Thanks for the KIP.
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> Can you elaborate a little bit more on the use case for
>> this
>> >> > > >>> feature?
>> >> > > >>>>>>>> Why would a consumer need to update it's metadata
>> explicitly?
>> >> > > >>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> -Matthias
>> >> > > >>>>>>>>
>> >> > > >>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>> >> > > >>>>>>>>> Hi devs,
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> I'd like to initiate discussion around KIP-505, exposing
>> new
>> >> > > >>> public
>> >> > > >>>>>>>> method
>> >> > > >>>>>>>>> to only update assignment metadata in consumer.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> `poll(0)` has been misused as according to Kafka doc it
>> >> doesn't
>> >> > > >>>>>>>> guarantee
>> >> > > >>>>>>>>> that it doesn't pull any records, and new method
>> >> `poll(Duration)`
>> >> > > >>>>>>>> doesn't
>> >> > > >>>>>>>>> have same semantic, so would like to propose new public
>> API
>> >> which
>> >> > > >>>> only
>> >> > > >>>>>>>> does
>> >> > > >>>>>>>>> the desired behavior.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Please feel free to suggest any improvements on proposal,
>> >> as I'm
>> >> > > >>> new
>> >> > > >>>>> to
>> >> > > >>>>>>>>> Kafka community and may not catch preferences (like
>> >> > > >>> TimeoutException
>> >> > > >>>>> vs
>> >> > > >>>>>>>>> boolean, etc.) on Kafka project.
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>> Thanks in advance!
>> >> > > >>>>>>>>> Jungtaek Lim (HeartSaVioR)
>> >> > > >>>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>>
>> >> > > >>>>>>>
>> >> > > >>>>>>> --
>> >> > > >>>>>>> Name : Jungtaek Lim
>> >> > > >>>>>>> Blog : http://medium.com/@heartsavior
>> >> > > >>>>>>> Twitter : http://twitter.com/heartsavior
>> >> > > >>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> > > >>>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>>
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>
>> >> > > >>
>> >> > > >> --
>> >> > > >> Name : Jungtaek Lim
>> >> > > >> Blog : http://medium.com/@heartsavior
>> >> > > >> Twitter : http://twitter.com/heartsavior
>> >> > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> > > >>
>> >> > > >
>> >> > >
>> >> > >
>> >> >
>> >> > --
>> >> > Name : Jungtaek Lim
>> >> > Blog : http://medium.com/@heartsavior
>> >> > Twitter : http://twitter.com/heartsavior
>> >> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >> >
>> >>
>> >
>> >
>> > --
>> > Name : Jungtaek Lim
>> > Blog : http://medium.com/@heartsavior
>> > Twitter : http://twitter.com/heartsavior
>> > LinkedIn : http://www.linkedin.com/in/heartsavior
>> >
>>
>>
>> --
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
>

Reply via email to