Hi Colin,

Thanks for your suggestion! Which KIPs are you referring to?

BR,
G


On Mon, Aug 12, 2019 at 5:22 PM Colin McCabe <cmcc...@apache.org> wrote:

> Hi,
>
> If there’s no need to consume records in the Spark driver, then the
> Consumer is probably the wrong thing to use. Instead, Spark should use
> AdminClient to find out what partitions exist and where, manage their
> offsets, and so on. There are some KIPs under discussion now that would add
> the necessary APIs for managing offsets.
>
> Best,
> Colin
>
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
> > My feeling is that I didn't explain the use case for Spark properly and
> > hence fail to explain the needs. Sorry about this.
> >
> > Spark leverages the single instance of KafkaConsumer in the driver which
> is
> > registered solely on the consumer group. This is used in the plan phase
> for
> > each micro-batch to calculate the overall topicpartitions with its offset
> > ranges for this batch, and split and assign (topicpartition, fromOffset,
> > untilOffset) to each input partition. After the planning is done and
> tasks
> > are being distributed to executors, consumer per each input partition
> will
> > be initialized from some executor (being assigned to the single
> > topicpartition), and pull the actual records. (Pooling consumers is
> applied
> > for sure.) As plan phase is to determine the overall topicpartitions and
> > offset ranges to process, Spark is never interested on pulling the
> records
> > in driver side.
> >
> > Spark mainly leverages poll(0) to get the latest assigned partitions and
> > adopt the changes or validate the expectation. That's not only use case
> for
> > poll(0). Spark is also seeking the offset per topicpartition to the
> > earliest or the latest, or specific one (either provided by end user or
> the
> > last committed offset) so that Spark can have actual offset or validate
> the
> > provided offset. According to the javadoc (if I understand correctly), to
> > get the offset immediately it seems to be required to call `poll` or
> > `position`.
> >
> > The way Spark interacts with Kafka in this plan phase in driver is
> > synchronous, as the phase should finish ASAP to run the next phase.
> > Registering ConsumerRebalanceListener and tracking the change will
> require
> > some asynchronous handling which sounds to add unnecessary complexity.
> > Spark may be OK with deal with synchronous with timeout (that's what
> > methods in KafkaConsumer have been providing - they're not asynchronous,
> at
> > least for callers) but dealing with asynchronous is another level of
> > interest. I can see the benefit where continuous thread runs and the
> > consumer is busy with something continuously, relying on listener to hear
> > the news on reassignment. Unfortunately that's not the case.
> >
> > Unit tests in Spark have similar needs: looks like Kafka test code also
> > leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
> > as it's appropriate to the place which blocking (+timeout) call is
> > preferred - so I can see the similar needs from here as well.
> >
> > On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <gabor.g.somo...@gmail.com
> >
> > wrote:
> >
> > > Hi Guys,
> > >
> > > Please see the actual implementation, pretty sure it explains the
> situation
> > > well:
> > >
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
> > >
> > > To answer one question/assumption which popped up from all of you
> Spark not
> > > only uses KafkaConsumer#subscribe but pattern subscribe +
> > > KafkaConsumer#assign as well.
> > > Please see here:
> > >
> > >
> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
> > >
> > > BR,
> > > G
> > >
> > >
> > > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <
> satish.dugg...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jungtaek,
> > > > Thanks for the KIP. I have a couple of questions here.
> > > > Is not Spark using Kafka's consumer group management across multiple
> > > > consumers?
> > > >
> > > > Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> > > > ConsumerRebalanceListener listener) only to get all the topics for a
> > > > pattern based subscription and Spark manually assigns those
> > > > topic-partitions across consumers on workers?
> > > >
> > > > Thanks,
> > > > Satish.
> > > >
> > > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <
> matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > If am not sure if I fully understand yet.
> > > > >
> > > > > The fact, that Spark does not stores offsets in Kafka but as part
> of
> > > its
> > > > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > > > > something here.
> > > > >
> > > > > As you are using subscribe(), you use Kafka consumer group
> mechanism,
> > > > > that takes care of the assignment of partitions to clients within
> the
> > > > > group. Therefore, I am not sure what you mean by:
> > > > >
> > > > > > which Spark needs to
> > > > > >> know to coordinate multiple consumers to pull correctly.
> > > > >
> > > > > Multiple thoughts that may help:
> > > > >
> > > > > - if Spark needs more control about the partition assignment, you
> can
> > > > > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > > > > configuration)
> > > > >
> > > > > - you may also want to register `ConsumerRebalanceListener` via
> > > > > `subscribe()` to get informed when the group rebalances
> > > > >
> > > > > As you pointed out, using pattern subscription metadata can change
> if
> > > > > topic are added/deleted. However, each metadata change will
> triggering
> > > a
> > > > > rebalance and thus you would get corresponding calls to you
> rebalance
> > > > > listener to learn about it and react accordingly.
> > > > >
> > > > > Maybe you can explain why neither of both approaches works and
> what gap
> > > > > the new API would close?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > > > > Let me elaborate my explanation a bit more. Here we say about
> Apache
> > > > > Spark,
> > > > > > but this will apply for everything which want to control offset
> of
> > > > Kafka
> > > > > > consumers.
> > > > > >
> > > > > > Spark is managing the committed offsets and the offsets which
> should
> > > be
> > > > > > polled now. Topics and partitions as well. This is required as
> Spark
> > > > > itself
> > > > > > has its own general checkpoint mechanism and Kafka is just a one
> of
> > > > > > source/sink (though it's considered as very important).
> > > > > >
> > > > > > To pull records from Kafka, Spark provides to Kafka which topics
> and
> > > > > > partitions it wants to subscribe(, and do seek and poll), but as
> > > Spark
> > > > > can
> > > > > > also provide "patterns" of topics, as well as subscription can be
> > > > changed
> > > > > > in Kafka side (topic added/dropped, partitions added) which Spark
> > > needs
> > > > > to
> > > > > > know to coordinate multiple consumers to pull correctly.
> > > > > >
> > > > > > Looks like assignment() doesn't update the assignment
> information in
> > > > > > consumer. It just returns known one. There's only one known
> approach
> > > > > doing
> > > > > > this, calling `poll`, but Spark is not interested on returned
> > > records,
> > > > so
> > > > > > there's a need for a hack `poll(0)`, and Kafka deprecated the
> API.
> > > This
> > > > > KIP
> > > > > > proposes to support this as official approach.
> > > > > >
> > > > > >
> > > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Sorry I didn't recognize you're also asking it here as well.
> I'm in
> > > > > favor
> > > > > >> of describing it in this discussion thread so the discussion
> itself
> > > > can
> > > > > go
> > > > > >> forward. So copying my answer here:
> > > > > >>
> > > > > >> We have some use case which we don't just rely on everything
> what
> > > > Kafka
> > > > > >> consumer provides. We want to know current assignment on this
> > > > consumer,
> > > > > and
> > > > > >> to get the latest assignment, we called the hack `poll(0)`.
> > > > > >>
> > > > > >> That said, we don't want to pull any records here, and if I'm
> not
> > > > > missing
> > > > > >> here, there's no way to accomplish this. Please guide me if I'm
> > > > missing
> > > > > >> something.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Jungtaek Lim (HeartSaVioR)
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> > > > matth...@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Thanks for the KIP.
> > > > > >>>
> > > > > >>> Can you elaborate a little bit more on the use case for this
> > > feature?
> > > > > >>> Why would a consumer need to update it's metadata explicitly?
> > > > > >>>
> > > > > >>>
> > > > > >>> -Matthias
> > > > > >>>
> > > > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > > > > >>>> Hi devs,
> > > > > >>>>
> > > > > >>>> I'd like to initiate discussion around KIP-505, exposing new
> > > public
> > > > > >>> method
> > > > > >>>> to only update assignment metadata in consumer.
> > > > > >>>>
> > > > > >>>> `poll(0)` has been misused as according to Kafka doc it
> doesn't
> > > > > >>> guarantee
> > > > > >>>> that it doesn't pull any records, and new method
> `poll(Duration)`
> > > > > >>> doesn't
> > > > > >>>> have same semantic, so would like to propose new public API
> which
> > > > only
> > > > > >>> does
> > > > > >>>> the desired behavior.
> > > > > >>>>
> > > > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > > > > >>>>
> > > > > >>>> Please feel free to suggest any improvements on proposal, as
> I'm
> > > new
> > > > > to
> > > > > >>>> Kafka community and may not catch preferences (like
> > > TimeoutException
> > > > > vs
> > > > > >>>> boolean, etc.) on Kafka project.
> > > > > >>>>
> > > > > >>>> Thanks in advance!
> > > > > >>>> Jungtaek Lim (HeartSaVioR)
> > > > > >>>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >> --
> > > > > >> Name : Jungtaek Lim
> > > > > >> Blog : http://medium.com/@heartsavior
> > > > > >> Twitter : http://twitter.com/heartsavior
> > > > > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Name : Jungtaek Lim
> > Blog : http://medium.com/@heartsavior
> > Twitter : http://twitter.com/heartsavior
> > LinkedIn : http://www.linkedin.com/in/heartsavior
> >
>

Reply via email to