Hi Colin, Thanks for your suggestion! Which KIPs are you referring to?
BR, G On Mon, Aug 12, 2019 at 5:22 PM Colin McCabe <cmcc...@apache.org> wrote: > Hi, > > If there’s no need to consume records in the Spark driver, then the > Consumer is probably the wrong thing to use. Instead, Spark should use > AdminClient to find out what partitions exist and where, manage their > offsets, and so on. There are some KIPs under discussion now that would add > the necessary APIs for managing offsets. > > Best, > Colin > > On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote: > > My feeling is that I didn't explain the use case for Spark properly and > > hence fail to explain the needs. Sorry about this. > > > > Spark leverages the single instance of KafkaConsumer in the driver which > is > > registered solely on the consumer group. This is used in the plan phase > for > > each micro-batch to calculate the overall topicpartitions with its offset > > ranges for this batch, and split and assign (topicpartition, fromOffset, > > untilOffset) to each input partition. After the planning is done and > tasks > > are being distributed to executors, consumer per each input partition > will > > be initialized from some executor (being assigned to the single > > topicpartition), and pull the actual records. (Pooling consumers is > applied > > for sure.) As plan phase is to determine the overall topicpartitions and > > offset ranges to process, Spark is never interested on pulling the > records > > in driver side. > > > > Spark mainly leverages poll(0) to get the latest assigned partitions and > > adopt the changes or validate the expectation. That's not only use case > for > > poll(0). Spark is also seeking the offset per topicpartition to the > > earliest or the latest, or specific one (either provided by end user or > the > > last committed offset) so that Spark can have actual offset or validate > the > > provided offset. According to the javadoc (if I understand correctly), to > > get the offset immediately it seems to be required to call `poll` or > > `position`. > > > > The way Spark interacts with Kafka in this plan phase in driver is > > synchronous, as the phase should finish ASAP to run the next phase. > > Registering ConsumerRebalanceListener and tracking the change will > require > > some asynchronous handling which sounds to add unnecessary complexity. > > Spark may be OK with deal with synchronous with timeout (that's what > > methods in KafkaConsumer have been providing - they're not asynchronous, > at > > least for callers) but dealing with asynchronous is another level of > > interest. I can see the benefit where continuous thread runs and the > > consumer is busy with something continuously, relying on listener to hear > > the news on reassignment. Unfortunately that's not the case. > > > > Unit tests in Spark have similar needs: looks like Kafka test code also > > leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places > > as it's appropriate to the place which blocking (+timeout) call is > > preferred - so I can see the similar needs from here as well. > > > > On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <gabor.g.somo...@gmail.com > > > > wrote: > > > > > Hi Guys, > > > > > > Please see the actual implementation, pretty sure it explains the > situation > > > well: > > > > > > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala > > > > > > To answer one question/assumption which popped up from all of you > Spark not > > > only uses KafkaConsumer#subscribe but pattern subscribe + > > > KafkaConsumer#assign as well. > > > Please see here: > > > > > > > https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala > > > > > > BR, > > > G > > > > > > > > > On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana < > satish.dugg...@gmail.com> > > > wrote: > > > > > > > Hi Jungtaek, > > > > Thanks for the KIP. I have a couple of questions here. > > > > Is not Spark using Kafka's consumer group management across multiple > > > > consumers? > > > > > > > > Is Spark using KafkaConsumer#subscribe(Pattern pattern, > > > > ConsumerRebalanceListener listener) only to get all the topics for a > > > > pattern based subscription and Spark manually assigns those > > > > topic-partitions across consumers on workers? > > > > > > > > Thanks, > > > > Satish. > > > > > > > > On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax < > matth...@confluent.io> > > > > wrote: > > > > > > > > > If am not sure if I fully understand yet. > > > > > > > > > > The fact, that Spark does not stores offsets in Kafka but as part > of > > > its > > > > > own checkpoint mechanism seems to be orthogonal. Maybe I am missing > > > > > something here. > > > > > > > > > > As you are using subscribe(), you use Kafka consumer group > mechanism, > > > > > that takes care of the assignment of partitions to clients within > the > > > > > group. Therefore, I am not sure what you mean by: > > > > > > > > > > > which Spark needs to > > > > > >> know to coordinate multiple consumers to pull correctly. > > > > > > > > > > Multiple thoughts that may help: > > > > > > > > > > - if Spark needs more control about the partition assignment, you > can > > > > > provide a custom `ConsumerPartitionAssignor` (via the consumer > > > > > configuration) > > > > > > > > > > - you may also want to register `ConsumerRebalanceListener` via > > > > > `subscribe()` to get informed when the group rebalances > > > > > > > > > > As you pointed out, using pattern subscription metadata can change > if > > > > > topic are added/deleted. However, each metadata change will > triggering > > > a > > > > > rebalance and thus you would get corresponding calls to you > rebalance > > > > > listener to learn about it and react accordingly. > > > > > > > > > > Maybe you can explain why neither of both approaches works and > what gap > > > > > the new API would close? > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > On 8/11/19 5:11 AM, Jungtaek Lim wrote: > > > > > > Let me elaborate my explanation a bit more. Here we say about > Apache > > > > > Spark, > > > > > > but this will apply for everything which want to control offset > of > > > > Kafka > > > > > > consumers. > > > > > > > > > > > > Spark is managing the committed offsets and the offsets which > should > > > be > > > > > > polled now. Topics and partitions as well. This is required as > Spark > > > > > itself > > > > > > has its own general checkpoint mechanism and Kafka is just a one > of > > > > > > source/sink (though it's considered as very important). > > > > > > > > > > > > To pull records from Kafka, Spark provides to Kafka which topics > and > > > > > > partitions it wants to subscribe(, and do seek and poll), but as > > > Spark > > > > > can > > > > > > also provide "patterns" of topics, as well as subscription can be > > > > changed > > > > > > in Kafka side (topic added/dropped, partitions added) which Spark > > > needs > > > > > to > > > > > > know to coordinate multiple consumers to pull correctly. > > > > > > > > > > > > Looks like assignment() doesn't update the assignment > information in > > > > > > consumer. It just returns known one. There's only one known > approach > > > > > doing > > > > > > this, calling `poll`, but Spark is not interested on returned > > > records, > > > > so > > > > > > there's a need for a hack `poll(0)`, and Kafka deprecated the > API. > > > This > > > > > KIP > > > > > > proposes to support this as official approach. > > > > > > > > > > > > > > > > > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> > > > > wrote: > > > > > > > > > > > >> Sorry I didn't recognize you're also asking it here as well. > I'm in > > > > > favor > > > > > >> of describing it in this discussion thread so the discussion > itself > > > > can > > > > > go > > > > > >> forward. So copying my answer here: > > > > > >> > > > > > >> We have some use case which we don't just rely on everything > what > > > > Kafka > > > > > >> consumer provides. We want to know current assignment on this > > > > consumer, > > > > > and > > > > > >> to get the latest assignment, we called the hack `poll(0)`. > > > > > >> > > > > > >> That said, we don't want to pull any records here, and if I'm > not > > > > > missing > > > > > >> here, there's no way to accomplish this. Please guide me if I'm > > > > missing > > > > > >> something. > > > > > >> > > > > > >> Thanks, > > > > > >> Jungtaek Lim (HeartSaVioR) > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax < > > > > matth...@confluent.io> > > > > > >> wrote: > > > > > >> > > > > > >>> Thanks for the KIP. > > > > > >>> > > > > > >>> Can you elaborate a little bit more on the use case for this > > > feature? > > > > > >>> Why would a consumer need to update it's metadata explicitly? > > > > > >>> > > > > > >>> > > > > > >>> -Matthias > > > > > >>> > > > > > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote: > > > > > >>>> Hi devs, > > > > > >>>> > > > > > >>>> I'd like to initiate discussion around KIP-505, exposing new > > > public > > > > > >>> method > > > > > >>>> to only update assignment metadata in consumer. > > > > > >>>> > > > > > >>>> `poll(0)` has been misused as according to Kafka doc it > doesn't > > > > > >>> guarantee > > > > > >>>> that it doesn't pull any records, and new method > `poll(Duration)` > > > > > >>> doesn't > > > > > >>>> have same semantic, so would like to propose new public API > which > > > > only > > > > > >>> does > > > > > >>>> the desired behavior. > > > > > >>>> > > > > > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw > > > > > >>>> > > > > > >>>> Please feel free to suggest any improvements on proposal, as > I'm > > > new > > > > > to > > > > > >>>> Kafka community and may not catch preferences (like > > > TimeoutException > > > > > vs > > > > > >>>> boolean, etc.) on Kafka project. > > > > > >>>> > > > > > >>>> Thanks in advance! > > > > > >>>> Jungtaek Lim (HeartSaVioR) > > > > > >>>> > > > > > >>> > > > > > >>> > > > > > >> > > > > > >> -- > > > > > >> Name : Jungtaek Lim > > > > > >> Blog : http://medium.com/@heartsavior > > > > > >> Twitter : http://twitter.com/heartsavior > > > > > >> LinkedIn : http://www.linkedin.com/in/heartsavior > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Name : Jungtaek Lim > > Blog : http://medium.com/@heartsavior > > Twitter : http://twitter.com/heartsavior > > LinkedIn : http://www.linkedin.com/in/heartsavior > > >