For now Spark needs to know about exact offset for EARLIEST, LATEST per
partition so that it can handle users' query on EARLIEST/LATEST and write
exact offset in checkpoint. I guess Spark would also want to validate the
known offset, but I guess that could be covered by knowing range of
available offsets. (Not 100% sure, but I imagine.)

I have proposed a PR on Spark side which addressed offset for timestamp, so
it should be retrieved from AdminClient as well if we want to move out of
consumer. For now it's also leveraging consumer as well.

On Tue, Aug 13, 2019 at 7:51 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Note that `KafkaConsumer` refreshed it's metadata every 5 minutes by
> default anyway... (parameter `metadata.max.age.ms`). And of course, you
> can refresh the metadata you get via AdminClient each time you trigger
> planning. I cannot quantify the overhead of a single request though.
>
> Also, what offset information are you interested in? Because you don't
> commit any offsets to Kafka, but store it in Spark's checkpoint, it's
> unclear what information you are looking for?
>
> -Matthias
>
>
> On 8/12/19 2:53 PM, Jungtaek Lim wrote:
> > Thanks for the feedbacks Colin and Matthias.
> >
> > I agree with you regarding getting topics and partitions via AdminClient,
> > just curious how much the overhead would be. Would it be lighter, or
> > heavier? We may not want to list topics in regular intervals - in plan
> > phase we want to know up-to-date information so that the calculation from
> > Spark itself makes sense.
> >
> > On the other hands I'm not seeing any information regarding offset in
> > current AdminClient, which is also one of reason we leverage consumer and
> > call poll(0). Colin, as you mentioned there're KIPs addressing this,
> could
> > you refer KIPs so that we can see whether it would work for our case?
> > Without support of this we cannot replace our usage of consumer/poll with
> > AdminClient.
> >
> > ps. IMHO it seems to be helpful if there's overloaded `listTopics` which
> > receives regex same as consumer subscription via pattern. We would like
> to
> > provide same behavior what Kafka is basically providing as a source.
> >
> >
> > On Tue, Aug 13, 2019 at 1:03 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks for the details Jungtaek!
> >>
> >> I tend to agree with Colin, that using the AdminClient seems to be the
> >> better choice.
> >>
> >> You can get all topics via `listTopics()` (and you can refresh this
> >> information on regular intervals) and match any pattern against the list
> >> of available topics in the driver.
> >>
> >> As you use `assignment()` and store offsets in the Spark checkpoint, it
> >> seems that using consumer group management is not a good fit for the use
> >> case.
> >>
> >>
> >> Thoughts?
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 8/12/19 8:22 AM, Colin McCabe wrote:
> >>> Hi,
> >>>
> >>> If there’s no need to consume records in the Spark driver, then the
> >> Consumer is probably the wrong thing to use. Instead, Spark should use
> >> AdminClient to find out what partitions exist and where, manage their
> >> offsets, and so on. There are some KIPs under discussion now that would
> add
> >> the necessary APIs for managing offsets.
> >>>
> >>> Best,
> >>> Colin
> >>>
> >>> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> >>>> My feeling is that I didn't explain the use case for Spark properly
> and
> >>>> hence fail to explain the needs. Sorry about this.
> >>>>
> >>>> Spark leverages the single instance of KafkaConsumer in the driver
> >> which is
> >>>> registered solely on the consumer group. This is used in the plan
> phase
> >> for
> >>>> each micro-batch to calculate the overall topicpartitions with its
> >> offset
> >>>> ranges for this batch, and split and assign (topicpartition,
> fromOffset,
> >>>> untilOffset) to each input partition. After the planning is done and
> >> tasks
> >>>> are being distributed to executors, consumer per each input partition
> >> will
> >>>> be initialized from some executor (being assigned to the single
> >>>> topicpartition), and pull the actual records. (Pooling consumers is
> >> applied
> >>>> for sure.) As plan phase is to determine the overall topicpartitions
> and
> >>>> offset ranges to process, Spark is never interested on pulling the
> >> records
> >>>> in driver side.
> >>>>
> >>>> Spark mainly leverages poll(0) to get the latest assigned partitions
> and
> >>>> adopt the changes or validate the expectation. That's not only use
> case
> >> for
> >>>> poll(0). Spark is also seeking the offset per topicpartition to the
> >>>> earliest or the latest, or specific one (either provided by end user
> or
> >> the
> >>>> last committed offset) so that Spark can have actual offset or
> validate
> >> the
> >>>> provided offset. According to the javadoc (if I understand correctly),
> >> to
> >>>> get the offset immediately it seems to be required to call `poll` or
> >>>> `position`.
> >>>>
> >>>> The way Spark interacts with Kafka in this plan phase in driver is
> >>>> synchronous, as the phase should finish ASAP to run the next phase.
> >>>> Registering ConsumerRebalanceListener and tracking the change will
> >> require
> >>>> some asynchronous handling which sounds to add unnecessary complexity.
> >>>> Spark may be OK with deal with synchronous with timeout (that's what
> >>>> methods in KafkaConsumer have been providing - they're not
> >> asynchronous, at
> >>>> least for callers) but dealing with asynchronous is another level of
> >>>> interest. I can see the benefit where continuous thread runs and the
> >>>> consumer is busy with something continuously, relying on listener to
> >> hear
> >>>> the news on reassignment. Unfortunately that's not the case.
> >>>>
> >>>> Unit tests in Spark have similar needs: looks like Kafka test code
> also
> >>>> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many
> >> places
> >>>> as it's appropriate to the place which blocking (+timeout) call is
> >>>> preferred - so I can see the similar needs from here as well.
> >>>>
> >>>> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <
> >> gabor.g.somo...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Guys,
> >>>>>
> >>>>> Please see the actual implementation, pretty sure it explains the
> >> situation
> >>>>> well:
> >>>>>
> >>>>>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> >>>>>
> >>>>> To answer one question/assumption which popped up from all of you
> >> Spark not
> >>>>> only uses KafkaConsumer#subscribe but pattern subscribe +
> >>>>> KafkaConsumer#assign as well.
> >>>>> Please see here:
> >>>>>
> >>>>>
> >>
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> >>>>>
> >>>>> BR,
> >>>>> G
> >>>>>
> >>>>>
> >>>>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> >> satish.dugg...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jungtaek,
> >>>>>> Thanks for the KIP. I have a couple of questions here.
> >>>>>> Is not Spark using Kafka's consumer group management across multiple
> >>>>>> consumers?
> >>>>>>
> >>>>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> >>>>>> ConsumerRebalanceListener listener) only to get all the topics for a
> >>>>>> pattern based subscription and Spark manually assigns those
> >>>>>> topic-partitions across consumers on workers?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Satish.
> >>>>>>
> >>>>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> If am not sure if I fully understand yet.
> >>>>>>>
> >>>>>>> The fact, that Spark does not stores offsets in Kafka but as part
> of
> >>>>> its
> >>>>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> >>>>>>> something here.
> >>>>>>>
> >>>>>>> As you are using subscribe(), you use Kafka consumer group
> mechanism,
> >>>>>>> that takes care of the assignment of partitions to clients within
> the
> >>>>>>> group. Therefore, I am not sure what you mean by:
> >>>>>>>
> >>>>>>>> which Spark needs to
> >>>>>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>>>
> >>>>>>> Multiple thoughts that may help:
> >>>>>>>
> >>>>>>> - if Spark needs more control about the partition assignment, you
> can
> >>>>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
> >>>>>>> configuration)
> >>>>>>>
> >>>>>>> - you may also want to register `ConsumerRebalanceListener` via
> >>>>>>> `subscribe()` to get informed when the group rebalances
> >>>>>>>
> >>>>>>> As you pointed out, using pattern subscription metadata can change
> if
> >>>>>>> topic are added/deleted. However, each metadata change will
> >> triggering
> >>>>> a
> >>>>>>> rebalance and thus you would get corresponding calls to you
> rebalance
> >>>>>>> listener to learn about it and react accordingly.
> >>>>>>>
> >>>>>>> Maybe you can explain why neither of both approaches works and what
> >> gap
> >>>>>>> the new API would close?
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> >>>>>>>> Let me elaborate my explanation a bit more. Here we say about
> Apache
> >>>>>>> Spark,
> >>>>>>>> but this will apply for everything which want to control offset of
> >>>>>> Kafka
> >>>>>>>> consumers.
> >>>>>>>>
> >>>>>>>> Spark is managing the committed offsets and the offsets which
> should
> >>>>> be
> >>>>>>>> polled now. Topics and partitions as well. This is required as
> Spark
> >>>>>>> itself
> >>>>>>>> has its own general checkpoint mechanism and Kafka is just a one
> of
> >>>>>>>> source/sink (though it's considered as very important).
> >>>>>>>>
> >>>>>>>> To pull records from Kafka, Spark provides to Kafka which topics
> and
> >>>>>>>> partitions it wants to subscribe(, and do seek and poll), but as
> >>>>> Spark
> >>>>>>> can
> >>>>>>>> also provide "patterns" of topics, as well as subscription can be
> >>>>>> changed
> >>>>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
> >>>>> needs
> >>>>>>> to
> >>>>>>>> know to coordinate multiple consumers to pull correctly.
> >>>>>>>>
> >>>>>>>> Looks like assignment() doesn't update the assignment information
> in
> >>>>>>>> consumer. It just returns known one. There's only one known
> approach
> >>>>>>> doing
> >>>>>>>> this, calling `poll`, but Spark is not interested on returned
> >>>>> records,
> >>>>>> so
> >>>>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
> >>>>> This
> >>>>>>> KIP
> >>>>>>>> proposes to support this as official approach.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm
> in
> >>>>>>> favor
> >>>>>>>>> of describing it in this discussion thread so the discussion
> itself
> >>>>>> can
> >>>>>>> go
> >>>>>>>>> forward. So copying my answer here:
> >>>>>>>>>
> >>>>>>>>> We have some use case which we don't just rely on everything what
> >>>>>> Kafka
> >>>>>>>>> consumer provides. We want to know current assignment on this
> >>>>>> consumer,
> >>>>>>> and
> >>>>>>>>> to get the latest assignment, we called the hack `poll(0)`.
> >>>>>>>>>
> >>>>>>>>> That said, we don't want to pull any records here, and if I'm not
> >>>>>>> missing
> >>>>>>>>> here, there's no way to accomplish this. Please guide me if I'm
> >>>>>> missing
> >>>>>>>>> something.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> >>>>>> matth...@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>
> >>>>>>>>>> Can you elaborate a little bit more on the use case for this
> >>>>> feature?
> >>>>>>>>>> Why would a consumer need to update it's metadata explicitly?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >>>>>>>>>>> Hi devs,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
> >>>>> public
> >>>>>>>>>> method
> >>>>>>>>>>> to only update assignment metadata in consumer.
> >>>>>>>>>>>
> >>>>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> >>>>>>>>>> guarantee
> >>>>>>>>>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> >>>>>>>>>> doesn't
> >>>>>>>>>>> have same semantic, so would like to propose new public API
> which
> >>>>>> only
> >>>>>>>>>> does
> >>>>>>>>>>> the desired behavior.
> >>>>>>>>>>>
> >>>>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >>>>>>>>>>>
> >>>>>>>>>>> Please feel free to suggest any improvements on proposal, as
> I'm
> >>>>> new
> >>>>>>> to
> >>>>>>>>>>> Kafka community and may not catch preferences (like
> >>>>> TimeoutException
> >>>>>>> vs
> >>>>>>>>>>> boolean, etc.) on Kafka project.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks in advance!
> >>>>>>>>>>> Jungtaek Lim (HeartSaVioR)
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Name : Jungtaek Lim
> >>>>>>>>> Blog : http://medium.com/@heartsavior
> >>>>>>>>> Twitter : http://twitter.com/heartsavior
> >>>>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> Name : Jungtaek Lim
> >>>> Blog : http://medium.com/@heartsavior
> >>>> Twitter : http://twitter.com/heartsavior
> >>>> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to