Thanks for investing your time!

The first 2 bulletpoint are clear.
If there is a chance that a TM can go to an inconsistent state then I agree
with the 3rd bulletpoint.
Just before we agree on that I would like to learn something new and
understand how is it possible that a TM
gets corrupted? (In Spark I've never seen such thing and no mechanism to
fix this but Flink is definitely not Spark)

Here is my understanding:
* DTM pushes new obtained DTs to TMs and if any exception occurs then a
retry after "security.kerberos.tokens.retry-wait"
happens. This means DTM retries until it's not possible to send new DTs to
all registered TMs.
* New TM registration must fail if "updateDelegationToken" fails
* "updateDelegationToken" fails consistently like a DB (at least I plan to
implement it that way).
If DTs are arriving on the TM side then a single
"UserGroupInformation.getCurrentUser.addCredentials"
will be called which I've never seen it failed.
* I hope all other code parts are not touching existing DTs within the JVM

I would like to emphasize I'm not against to add it just want to see what
kind of problems are we facing.
It would ease to catch bugs earlier and help in the maintenance.

All in all I would buy the idea to add the 3rd bullet if we foresee the
need.

G


On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org> wrote:

> Hi Gabor,
>
> This is definitely headed in a right direction +1.
>
> I think we still need to have a safeguard in case some of the TMs gets into
> the inconsistent state though, which will also eliminate the need for
> implementing a custom retry mechanism (when _updateDelegationToken_ call
> fails for some reason).
>
> We already have this safeguard in place for slot pool (in case there are
> some slots in inconsistent state - eg. we haven't freed them for some
> reason) and for the partition tracker, which could be simply enhanced. This
> is done via periodic heartbeat from TaskManagers to the ResourceManager
> that contains report about state of these two components (from TM
> perspective) so the RM can reconcile their state if necessary.
>
> I don't think adding an additional field to _TaskExecutorHeartbeatPayload_
> should be a concern as we only heartbeat every ~ 10s by default and the new
> field would be small compared to rest of the existing payload. Also
> heartbeat doesn't need to contain the whole DT, but just some identifier
> which signals whether it uses the right one, that could be significantly
> smaller.
>
> This is still a PUSH based approach as the RM would again call the newly
> introduced _updateDelegationToken_ when it encounters inconsistency (eg.
> due to a temporary network partition / a race condition we didn't test for
> / some other scenario we didn't think about). In practice these
> inconsistencies are super hard to avoid and reason about (and unfortunately
> yes, we see them happen from time to time), so reusing the existing
> mechanism that is designed for this exact problem simplify things.
>
> To sum this up we'd have three code paths for calling
> _updateDelegationToken_:
> 1) When the TM registers, we push the token (if DTM already has it) to it
> 2) When DTM obtains a new token it broadcasts it to all currently connected
> TMs
> 3) When a TM gets out of sync, DTM would reconcile it's state
>
> WDYT?
>
> Best,
> D.
>
>
> On Wed, Jan 26, 2022 at 9:03 PM David Morávek <d...@apache.org> wrote:
>
> > Thanks the update, I'll go over it tomorrow.
> >
> > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi <gabor.g.somo...@gmail.com
> >
> > wrote:
> >
> >> Hi All,
> >>
> >> Since it has turned out that DTM can't be added as member of JobMaster
> >> <
> >>
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> >> >
> >> I've
> >> came up with a better proposal.
> >> David, thanks for pinpointing this out, you've caught a bug in the early
> >> phase!
> >>
> >> Namely ResourceManager
> >> <
> >>
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
> >> >
> >> is
> >> a single instance class where DTM can be added as member variable.
> >> It has a list of all already registered TMs and new TM registration is
> >> also
> >> happening here.
> >> The following can be added from logic perspective to be more specific:
> >> * Create new DTM instance in ResourceManager
> >> <
> >>
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
> >> >
> >> and
> >> start it (re-occurring thread to obtain new tokens)
> >> * Add a new function named "updateDelegationTokens" to
> TaskExecutorGateway
> >> <
> >>
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54
> >> >
> >> * Call "updateDelegationTokens" on all registered TMs to propagate new
> DTs
> >> * In case of new TM registration call "updateDelegationTokens" before
> >> registration succeeds to setup new TM properly
> >>
> >> This way:
> >> * only a single DTM would live within a cluster which is the expected
> >> behavior
> >> * DTM is going to be added to a central place where all deployment
> target
> >> can make use of it
> >> * DTs are going to be pushed to TMs which would generate less network
> >> traffic than pull based approach
> >> (please see my previous mail where I've described both approaches)
> >> * HA scenario is going to be consistent because such
> >> <
> >>
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069
> >> >
> >> a solution can be added to "updateDelegationTokens"
> >>
> >> @David or all others plz share whether you agree on this or you have
> >> better
> >> idea/suggestion.
> >>
> >> BR,
> >> G
> >>
> >>
> >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi <
> gabor.g.somo...@gmail.com
> >> >
> >> wrote:
> >>
> >> > First of all thanks for investing your time and helping me out. As I
> see
> >> > you have pretty solid knowledge in the RPC area.
> >> > I would like to rely on your knowledge since I'm learning this part.
> >> >
> >> > > - Do we need to introduce a new RPC method or can we for example
> >> > piggyback
> >> > on heartbeats?
> >> >
> >> > I'm fine with either solution but one thing is important conceptually.
> >> > There are fundamentally 2 ways how tokens can be updated:
> >> > - Push way: When there are new DTs then JM JVM pushes DTs to TM JVMs.
> >> This
> >> > is the preferred one since tiny amount of control logic needed.
> >> > - Pull way: Each time a TM would like to poll JM whether there are new
> >> > tokens and each TM wants to decide alone whether DTs needs to be
> >> updated or
> >> > not.
> >> > As you've mentioned here some ID needs to be generated, it would
> >> generated
> >> > quite some additional network traffic which can be definitely avoided.
> >> > As a final thought in Spark we've had this way of DT propagation logic
> >> and
> >> > we've had major issues with it.
> >> >
> >> > So all in all DTM needs to obtain new tokens and there must a way to
> >> send
> >> > this data to all TMs from JM.
> >> >
> >> > > - What delivery semantics are we looking for? (what if we're only
> >> able to
> >> > update subset of TMs / what happens if we exhaust retries / should we
> >> even
> >> > have the retry mechanism whatsoever) - I have a feeling that somehow
> >> > leveraging the existing heartbeat mechanism could help to answer these
> >> > questions
> >> >
> >> > Let's go through these questions one by one.
> >> > > What delivery semantics are we looking for?
> >> >
> >> > DTM must receive an exception when at least one TM was not able to get
> >> DTs.
> >> >
> >> > > what if we're only able to update subset of TMs?
> >> >
> >> > Such case DTM will reschedule token obtain after
> >> > "security.kerberos.tokens.retry-wait" time.
> >> >
> >> > > what happens if we exhaust retries?
> >> >
> >> > There is no number of retries. In default configuration tokens needs
> to
> >> be
> >> > re-obtained after one day.
> >> > DTM tries to obtain new tokens after 1day * 0.75
> >> > (security.kerberos.tokens.renewal-ratio) = 18 hours.
> >> > When fails it retries after "security.kerberos.tokens.retry-wait"
> which
> >> is
> >> > 1 hour by default.
> >> > If it never succeeds then authentication error is going to happen on
> the
> >> > TM side and the workload is
> >> > going to stop.
> >> >
> >> > > should we even have the retry mechanism whatsoever?
> >> >
> >> > Yes, because there are always temporary cluster issues.
> >> >
> >> > > What does it mean for the running application (how does this look
> like
> >> > from
> >> > the user perspective)? As far as I remember the logs are only
> collected
> >> > ("aggregated") after the container is stopped, is that correct?
> >> >
> >> > With default config it works like that but it can be forced to
> aggregate
> >> > at specific intervals.
> >> > A useful feature is forcing YARN to aggregate logs while the job is
> >> still
> >> > running.
> >> > For long-running jobs such as streaming jobs, this is invaluable. To
> do
> >> > this,
> >> > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds must
> >> be
> >> > set to a non-negative value.
> >> > When this is set, a timer will be set for the given duration, and
> >> whenever
> >> > that timer goes off,
> >> > log aggregation will run on new files.
> >> >
> >> > > I think
> >> > this topic should get its own section in the FLIP (having some cross
> >> > reference to YARN ticket would be really useful, but I'm not sure if
> >> there
> >> > are any).
> >> >
> >> > I think this is important knowledge but this FLIP is not touching the
> >> > already existing behavior.
> >> > DTs are set on the AM container which is renewed by YARN until it's
> not
> >> > possible anymore.
> >> > Any kind of new code is not going to change this limitation. BTW,
> there
> >> is
> >> > no jira for this.
> >> > If you think it worth to write this down then I think the good place
> is
> >> > the official security doc
> >> > area as caveat.
> >> >
> >> > > If we split the FLIP into two parts / sections that I've suggested,
> I
> >> > don't
> >> > really think that you need to explicitly test for each deployment
> >> scenario
> >> > / cluster framework, because the DTM part is completely independent of
> >> the
> >> > deployment target. Basically this is what I'm aiming for with "making
> it
> >> > work with the standalone" (as simple as starting a new java process)
> >> Flink
> >> > first (which is also how most people deploy streaming application on
> k8s
> >> > and the direction we're pushing forward with the auto-scaling /
> reactive
> >> > mode initiatives).
> >> >
> >> > I see your point and agree the main direction. k8s is the megatrend
> >> which
> >> > most of the peoples
> >> > will use sooner or later. Not 100% sure what kind of split you suggest
> >> but
> >> > in my view
> >> > the main target is to add this feature and I'm open to any logical
> work
> >> > ordering.
> >> > Please share the specific details and we work it out...
> >> >
> >> > G
> >> >
> >> >
> >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek <d...@apache.org>
> wrote:
> >> >
> >> >> >
> >> >> > Could you point to a code where you think it could be added
> exactly?
> >> A
> >> >> > helping hand is welcome here 🙂
> >> >> >
> >> >>
> >> >> I think you can take a look at _ResourceManagerPartitionTracker_ [1]
> >> which
> >> >> seems to have somewhat similar properties to the DTM.
> >> >>
> >> >> One topic that needs to be addressed there is how the RPC with the
> >> >> _TaskExecutorGateway_ should look like.
> >> >> - Do we need to introduce a new RPC method or can we for example
> >> piggyback
> >> >> on heartbeats?
> >> >> - What delivery semantics are we looking for? (what if we're only
> able
> >> to
> >> >> update subset of TMs / what happens if we exhaust retries / should we
> >> even
> >> >> have the retry mechanism whatsoever) - I have a feeling that somehow
> >> >> leveraging the existing heartbeat mechanism could help to answer
> these
> >> >> questions
> >> >>
> >> >> In short, after DT reaches it's max lifetime then log aggregation
> stops
> >> >> >
> >> >>
> >> >> What does it mean for the running application (how does this look
> like
> >> >> from
> >> >> the user perspective)? As far as I remember the logs are only
> collected
> >> >> ("aggregated") after the container is stopped, is that correct? I
> think
> >> >> this topic should get its own section in the FLIP (having some cross
> >> >> reference to YARN ticket would be really useful, but I'm not sure if
> >> there
> >> >> are any).
> >> >>
> >> >> All deployment modes (per-job, per-app, ...) are planned to be tested
> >> and
> >> >> > expect to work with the initial implementation however not all
> >> >> deployment
> >> >> > targets (k8s, local, ...
> >> >> >
> >> >>
> >> >> If we split the FLIP into two parts / sections that I've suggested, I
> >> >> don't
> >> >> really think that you need to explicitly test for each deployment
> >> scenario
> >> >> / cluster framework, because the DTM part is completely independent
> of
> >> the
> >> >> deployment target. Basically this is what I'm aiming for with "making
> >> it
> >> >> work with the standalone" (as simple as starting a new java process)
> >> Flink
> >> >> first (which is also how most people deploy streaming application on
> >> k8s
> >> >> and the direction we're pushing forward with the auto-scaling /
> >> reactive
> >> >> mode initiatives).
> >> >>
> >> >> The whole integration with YARN (let's forget about log aggregation
> >> for a
> >> >> moment) / k8s-native only boils down to how do we make the keytab
> file
> >> >> local to the JobManager so the DTM can read it, so it's basically
> >> built on
> >> >> top of that. The only special thing that needs to be tested there is
> >> the
> >> >> "keytab distribution" code path.
> >> >>
> >> >> [1]
> >> >>
> >> >>
> >>
> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
> >> >>
> >> >> Best,
> >> >> D.
> >> >>
> >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi <
> >> gabor.g.somo...@gmail.com
> >> >> >
> >> >> wrote:
> >> >>
> >> >> > > There is a separate JobMaster for each job
> >> >> > within a Flink cluster and each JobMaster only has a partial view
> of
> >> the
> >> >> > task managers
> >> >> >
> >> >> > Good point! I've had a deeper look and you're right. We definitely
> >> need
> >> >> to
> >> >> > find another place.
> >> >> >
> >> >> > > Related per-cluster or per-job keytab:
> >> >> >
> >> >> > In the current code per-cluster keytab is implemented and I'm
> >> intended
> >> >> to
> >> >> > keep it like this within this FLIP. The reason is simple: tokens on
> >> TM
> >> >> side
> >> >> > can be stored within the UserGroupInformation (UGI) structure which
> >> is
> >> >> > global. I'm not telling it's impossible to change that but I think
> >> that
> >> >> > this is such a complexity which the initial implementation is not
> >> >> required
> >> >> > to contain. Additionally we've not seen such need from user side.
> If
> >> the
> >> >> > need may rise later on then another FLIP with this topic can be
> >> created
> >> >> and
> >> >> > discussed. Proper multi-UGI handling within a single JVM is a topic
> >> >> where
> >> >> > several round of deep-dive with the Hadoop/YARN guys are required.
> >> >> >
> >> >> > > single DTM instance embedded with
> >> >> > the ResourceManager (the Flink component)
> >> >> >
> >> >> > Could you point to a code where you think it could be added
> exactly?
> >> A
> >> >> > helping hand is welcome here🙂
> >> >> >
> >> >> > > Then the single (initial) implementation should work with all the
> >> >> > deployments modes out of the box (which is not what the FLIP
> >> suggests).
> >> >> Is
> >> >> > that correct?
> >> >> >
> >> >> > All deployment modes (per-job, per-app, ...) are planned to be
> tested
> >> >> and
> >> >> > expect to work with the initial implementation however not all
> >> >> deployment
> >> >> > targets (k8s, local, ...) are not intended to be tested. Per
> >> deployment
> >> >> > target new jira needs to be created where I expect small number of
> >> codes
> >> >> > needs to be added and relatively expensive testing effort is
> >> required.
> >> >> >
> >> >> > > I've taken a look into the prototype and in the
> >> >> "YarnClusterDescriptor"
> >> >> > you're injecting a delegation token into the AM [1] (that's
> obtained
> >> >> using
> >> >> > the provided keytab). If I understand this correctly from previous
> >> >> > discussion / FLIP, this is to support log aggregation and DT has a
> >> >> limited
> >> >> > validity. How is this DT going to be renewed?
> >> >> >
> >> >> > You're clever and touched a limitation which Spark has too. In
> short,
> >> >> after
> >> >> > DT reaches it's max lifetime then log aggregation stops. I've had
> >> >> several
> >> >> > deep-dive rounds with the YARN guys at Spark years because wanted
> to
> >> >> fill
> >> >> > this gap. They can't provide us any way to re-inject the newly
> >> obtained
> >> >> DT
> >> >> > so at the end I gave up this.
> >> >> >
> >> >> > BR,
> >> >> > G
> >> >> >
> >> >> >
> >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, <d...@apache.org> wrote:
> >> >> >
> >> >> > > Hi Gabor,
> >> >> > >
> >> >> > > There is actually a huge difference between JobManager (process)
> >> and
> >> >> > > JobMaster (job coordinator). The naming is unfortunately bit
> >> >> misleading
> >> >> > > here from historical reasons. There is a separate JobMaster for
> >> each
> >> >> job
> >> >> > > within a Flink cluster and each JobMaster only has a partial view
> >> of
> >> >> the
> >> >> > > task managers (depends on where the slots for a particular job
> are
> >> >> > > allocated). This means that you'll end up with N
> >> >> > "DelegationTokenManagers"
> >> >> > > competing with each other (N = number of running jobs in the
> >> cluster).
> >> >> > >
> >> >> > > This makes me think we're mixing two abstraction levels here:
> >> >> > >
> >> >> > > a) Per-cluster delegation tokens
> >> >> > > - Simpler approach, it would involve a single DTM instance
> embedded
> >> >> with
> >> >> > > the ResourceManager (the Flink component)
> >> >> > > b) Per-job delegation tokens
> >> >> > > - More complex approach, but could be more flexible from the user
> >> >> side of
> >> >> > > things.
> >> >> > > - Multiple DTM instances, that are bound with the JobMaster
> >> lifecycle.
> >> >> > > Delegation tokens are attached with a particular slots that are
> >> >> executing
> >> >> > > the job tasks instead of the whole task manager (TM could be
> >> executing
> >> >> > > multiple jobs with different tokens).
> >> >> > > - The question is which keytab should be used for the clustering
> >> >> > framework,
> >> >> > > to support log aggregation on YARN (an extra keytab, keytab that
> >> comes
> >> >> > with
> >> >> > > the first job?)
> >> >> > >
> >> >> > > I think these are the things that need to be clarified in the
> FLIP
> >> >> before
> >> >> > > proceeding.
> >> >> > >
> >> >> > > A follow-up question for getting a better understanding where
> this
> >> >> should
> >> >> > > be headed: Are there any use cases where user may want to use
> >> >> different
> >> >> > > keytabs with each job, or are we fine with using a cluster-wide
> >> >> keytab?
> >> >> > If
> >> >> > > we go with per-cluster keytabs, is it OK that all jobs submitted
> >> into
> >> >> > this
> >> >> > > cluster can access it (even the future ones)? Should this be a
> >> >> security
> >> >> > > concern?
> >> >> > >
> >> >> > > Presume you though I would implement a new class with JobManager
> >> name.
> >> >> > The
> >> >> > > > plan is not that.
> >> >> > > >
> >> >> > >
> >> >> > > I've never suggested such thing.
> >> >> > >
> >> >> > >
> >> >> > > > No. That said earlier DT handling is planned to be done
> >> completely
> >> >> in
> >> >> > > > Flink. DTM has a renewal thread which re-obtains tokens in the
> >> >> proper
> >> >> > > time
> >> >> > > > when needed.
> >> >> > > >
> >> >> > >
> >> >> > > Then the single (initial) implementation should work with all the
> >> >> > > deployments modes out of the box (which is not what the FLIP
> >> >> suggests).
> >> >> > Is
> >> >> > > that correct?
> >> >> > >
> >> >> > > If the cluster framework, also requires delegation token for
> their
> >> >> inner
> >> >> > > working (this is IMO only applies to YARN), it might need an
> extra
> >> >> step
> >> >> > > (injecting the token into application master container).
> >> >> > >
> >> >> > > Separating the individual layers (actual Flink cluster -
> basically
> >> >> making
> >> >> > > this work with a standalone deployment  / "cluster framework" -
> >> >> support
> >> >> > for
> >> >> > > YARN log aggregation) in the FLIP would be useful.
> >> >> > >
> >> >> > > Reading the linked Spark readme could be useful.
> >> >> > > >
> >> >> > >
> >> >> > > I've read that, but please be patient with the questions,
> Kerberos
> >> is
> >> >> not
> >> >> > > an easy topic to get into and I've had a very little contact with
> >> it
> >> >> in
> >> >> > the
> >> >> > > past.
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> >> >> > > >
> >> >> > >
> >> >> > > I've taken a look into the prototype and in the
> >> >> "YarnClusterDescriptor"
> >> >> > > you're injecting a delegation token into the AM [1] (that's
> >> obtained
> >> >> > using
> >> >> > > the provided keytab). If I understand this correctly from
> previous
> >> >> > > discussion / FLIP, this is to support log aggregation and DT has
> a
> >> >> > limited
> >> >> > > validity. How is this DT going to be renewed?
> >> >> > >
> >> >> > > [1]
> >> >> > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261
> >> >> > >
> >> >> > > Best,
> >> >> > > D.
> >> >> > >
> >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi <
> >> >> gabor.g.somo...@gmail.com
> >> >> > >
> >> >> > > wrote:
> >> >> > >
> >> >> > > > Here is the exact class, I'm from mobile so not had a look at
> the
> >> >> exact
> >> >> > > > class name:
> >> >> > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> >> >> > > > That keeps track of TMs where the tokens can be sent to.
> >> >> > > >
> >> >> > > > > My feeling would be that we shouldn't really introduce a new
> >> >> > component
> >> >> > > > with
> >> >> > > > a custom lifecycle, but rather we should try to incorporate
> this
> >> >> into
> >> >> > > > existing ones.
> >> >> > > >
> >> >> > > > Can you be more specific? Presume you though I would implement
> a
> >> new
> >> >> > > class
> >> >> > > > with JobManager name. The plan is not that.
> >> >> > > >
> >> >> > > > > If I understand this correctly, this means that we then push
> >> the
> >> >> > token
> >> >> > > > renewal logic to YARN.
> >> >> > > >
> >> >> > > > No. That said earlier DT handling is planned to be done
> >> completely
> >> >> in
> >> >> > > > Flink. DTM has a renewal thread which re-obtains tokens in the
> >> >> proper
> >> >> > > time
> >> >> > > > when needed. YARN log aggregation is a totally different
> feature,
> >> >> where
> >> >> > > > YARN does the renewal. Log aggregation was an example why the
> >> code
> >> >> > can't
> >> >> > > be
> >> >> > > > 100% reusable for all resource managers. Reading the linked
> Spark
> >> >> > readme
> >> >> > > > could be useful.
> >> >> > > >
> >> >> > > > G
> >> >> > > >
> >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, <d...@apache.org>
> >> wrote:
> >> >> > > >
> >> >> > > > > >
> >> >> > > > > > JobManager is the Flink class.
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > There is no such class in Flink. The closest thing to the
> >> >> JobManager
> >> >> > > is a
> >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns new RM
> Runner
> >> &
> >> >> > > > Dispatcher
> >> >> > > > > Runner that start participating in the leader election. Once
> >> they
> >> >> > gain
> >> >> > > > > leadership they spawn the actual underlying instances of
> these
> >> two
> >> >> > > "main
> >> >> > > > > components".
> >> >> > > > >
> >> >> > > > > My feeling would be that we shouldn't really introduce a new
> >> >> > component
> >> >> > > > with
> >> >> > > > > a custom lifecycle, but rather we should try to incorporate
> >> this
> >> >> into
> >> >> > > > > existing ones.
> >> >> > > > >
> >> >> > > > > My biggest concerns would be:
> >> >> > > > >
> >> >> > > > > - How would the lifecycle of the new component look like with
> >> >> regards
> >> >> > > to
> >> >> > > > HA
> >> >> > > > > setups. If we really try to decide to introduce a completely
> >> new
> >> >> > > > component,
> >> >> > > > > how should this work in case of multiple JobManager
> instances?
> >> >> > > > > - Which components does it talk to / how? For example how
> does
> >> the
> >> >> > > > > broadcast of new token to task managers (TaskManagerGateway)
> >> look
> >> >> > like?
> >> >> > > > Do
> >> >> > > > > we simply introduce a new RPC on the ResourceManagerGateway
> >> that
> >> >> > > > broadcasts
> >> >> > > > > it or does the new component need to do some kind of
> >> bookkeeping
> >> >> of
> >> >> > > task
> >> >> > > > > managers that it needs to notify?
> >> >> > > > >
> >> >> > > > > YARN based HDFS log aggregation would not work by dropping
> that
> >> >> code.
> >> >> > > > Just
> >> >> > > > > > to be crystal clear, the actual implementation contains
> this
> >> fir
> >> >> > > > exactly
> >> >> > > > > > this reason.
> >> >> > > > > >
> >> >> > > > >
> >> >> > > > > This is the missing part +1. If I understand this correctly,
> >> this
> >> >> > means
> >> >> > > > > that we then push the token renewal logic to YARN. How do you
> >> >> plan to
> >> >> > > > > implement the renewal logic on k8s?
> >> >> > > > >
> >> >> > > > > D.
> >> >> > > > >
> >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi <
> >> >> > > gabor.g.somo...@gmail.com
> >> >> > > > >
> >> >> > > > > wrote:
> >> >> > > > >
> >> >> > > > > > > I think we might both mean something different by the RM.
> >> >> > > > > >
> >> >> > > > > > You feel it well, I've not specified these terms well in
> the
> >> >> > > > explanation.
> >> >> > > > > > RM I meant resource management framework. JobManager is the
> >> >> Flink
> >> >> > > > class.
> >> >> > > > > > This means that inside JM instance there will be a DTM
> >> >> instance, so
> >> >> > > > they
> >> >> > > > > > would have the same lifecycle. Hope I've answered the
> >> question.
> >> >> > > > > >
> >> >> > > > > > > If we have tokens available on the client side, why do we
> >> >> need to
> >> >> > > set
> >> >> > > > > > them
> >> >> > > > > > into the AM (yarn specific concept) launch context?
> >> >> > > > > >
> >> >> > > > > > YARN based HDFS log aggregation would not work by dropping
> >> that
> >> >> > code.
> >> >> > > > > Just
> >> >> > > > > > to be crystal clear, the actual implementation contains
> this
> >> fir
> >> >> > > > exactly
> >> >> > > > > > this reason.
> >> >> > > > > >
> >> >> > > > > > G
> >> >> > > > > >
> >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, <d...@apache.org
> >
> >> >> wrote:
> >> >> > > > > >
> >> >> > > > > > > Hi Gabor,
> >> >> > > > > > >
> >> >> > > > > > > 1. One thing is important, token management is planned to
> >> be
> >> >> done
> >> >> > > > > > > > generically within Flink and not scattered in RM
> specific
> >> >> code.
> >> >> > > > > > > JobManager
> >> >> > > > > > > > has a DelegationTokenManager which obtains tokens
> >> >> time-to-time
> >> >> > > (if
> >> >> > > > > > > > configured properly). JM knows which TaskManagers are
> in
> >> >> place
> >> >> > so
> >> >> > > > it
> >> >> > > > > > can
> >> >> > > > > > > > distribute it to all TMs. That's it basically.
> >> >> > > > > > >
> >> >> > > > > > >
> >> >> > > > > > > I think we might both mean something different by the RM.
> >> >> > > JobManager
> >> >> > > > is
> >> >> > > > > > > basically just a process encapsulating multiple
> components,
> >> >> one
> >> >> > of
> >> >> > > > > which
> >> >> > > > > > is
> >> >> > > > > > > a ResourceManager, which is the component that manages
> task
> >> >> > manager
> >> >> > > > > > > registrations [1]. There is more or less a single
> >> >> implementation
> >> >> > of
> >> >> > > > the
> >> >> > > > > > RM
> >> >> > > > > > > with plugable drivers for the active integrations (yarn,
> >> k8s).
> >> >> > > > > > >
> >> >> > > > > > > It would be great if you could share more details of how
> >> >> exactly
> >> >> > > the
> >> >> > > > > DTM
> >> >> > > > > > is
> >> >> > > > > > > going to fit in the current JM architecture.
> >> >> > > > > > >
> >> >> > > > > > > 2. 99.9% of the code is generic but each RM handles
> tokens
> >> >> > > > > differently. A
> >> >> > > > > > > > good example is YARN obtains tokens on client side and
> >> then
> >> >> > sets
> >> >> > > > them
> >> >> > > > > > on
> >> >> > > > > > > > the newly created AM container launch context. This is
> >> >> purely
> >> >> > > YARN
> >> >> > > > > > > specific
> >> >> > > > > > > > and cant't be spared. With my actual plans standalone
> >> can be
> >> >> > > > changed
> >> >> > > > > to
> >> >> > > > > > > use
> >> >> > > > > > > > the framework. By using it I mean no RM specific DTM or
> >> >> > > whatsoever
> >> >> > > > is
> >> >> > > > > > > > needed.
> >> >> > > > > > > >
> >> >> > > > > > >
> >> >> > > > > > > If we have tokens available on the client side, why do we
> >> >> need to
> >> >> > > set
> >> >> > > > > > them
> >> >> > > > > > > into the AM (yarn specific concept) launch context? Why
> >> can't
> >> >> we
> >> >> > > > simply
> >> >> > > > > > > send them to the JM, eg. as a parameter of the job
> >> submission
> >> >> /
> >> >> > via
> >> >> > > > > > > separate RPC call? There might be something I'm missing
> >> due to
> >> >> > > > limited
> >> >> > > > > > > knowledge, but handling the token on the "cluster
> >> framework"
> >> >> > level
> >> >> > > > > > doesn't
> >> >> > > > > > > seem necessary.
> >> >> > > > > > >
> >> >> > > > > > > [1]
> >> >> > > > > > >
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager
> >> >> > > > > > >
> >> >> > > > > > > Best,
> >> >> > > > > > > D.
> >> >> > > > > > >
> >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi <
> >> >> > > > > gabor.g.somo...@gmail.com
> >> >> > > > > > >
> >> >> > > > > > > wrote:
> >> >> > > > > > >
> >> >> > > > > > > > Oh and one more thing. I'm planning to add this feature
> >> in
> >> >> > small
> >> >> > > > > chunk
> >> >> > > > > > of
> >> >> > > > > > > > PRs because security is super hairy area. That way
> >> reviewers
> >> >> > can
> >> >> > > be
> >> >> > > > > > more
> >> >> > > > > > > > easily obtains the concept.
> >> >> > > > > > > >
> >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, <
> >> d...@apache.org>
> >> >> > > wrote:
> >> >> > > > > > > >
> >> >> > > > > > > > > Hi Gabor,
> >> >> > > > > > > > >
> >> >> > > > > > > > > thanks for drafting the FLIP, I think having a solid
> >> >> Kerberos
> >> >> > > > > support
> >> >> > > > > > > is
> >> >> > > > > > > > > crucial for many enterprise deployments.
> >> >> > > > > > > > >
> >> >> > > > > > > > > I have multiple questions regarding the
> implementation
> >> >> (note
> >> >> > > > that I
> >> >> > > > > > > have
> >> >> > > > > > > > > very limited knowledge of Kerberos):
> >> >> > > > > > > > >
> >> >> > > > > > > > > 1) If I understand it correctly, we'll only obtain
> >> tokens
> >> >> in
> >> >> > > the
> >> >> > > > > job
> >> >> > > > > > > > > manager and then we'll distribute them via RPC (needs
> >> to
> >> >> be
> >> >> > > > > secured).
> >> >> > > > > > > > >
> >> >> > > > > > > > > Can you please outline how the communication will
> look
> >> >> like?
> >> >> > Is
> >> >> > > > the
> >> >> > > > > > > > > DelegationTokenManager going to be a part of the
> >> >> > > ResourceManager?
> >> >> > > > > Can
> >> >> > > > > > > you
> >> >> > > > > > > > > outline it's lifecycle / how it's going to be
> >> integrated
> >> >> > there?
> >> >> > > > > > > > >
> >> >> > > > > > > > > 2) Do we really need a YARN / k8s specific
> >> >> implementations?
> >> >> > Is
> >> >> > > it
> >> >> > > > > > > > possible
> >> >> > > > > > > > > to obtain / renew a token in a generic way? Maybe to
> >> >> rephrase
> >> >> > > > that,
> >> >> > > > > > is
> >> >> > > > > > > it
> >> >> > > > > > > > > possible to implement DelegationTokenManager for the
> >> >> > standalone
> >> >> > > > > > Flink?
> >> >> > > > > > > If
> >> >> > > > > > > > > we're able to solve this point, it could be possible
> to
> >> >> > target
> >> >> > > > all
> >> >> > > > > > > > > deployment scenarios with a single implementation.
> >> >> > > > > > > > >
> >> >> > > > > > > > > Best,
> >> >> > > > > > > > > D.
> >> >> > > > > > > > >
> >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan Zhang <
> >> >> > > > > > zuston.sha...@gmail.com>
> >> >> > > > > > > > > wrote:
> >> >> > > > > > > > >
> >> >> > > > > > > > > > Hi G
> >> >> > > > > > > > > >
> >> >> > > > > > > > > > Thanks for your explain in detail. I have gotten
> your
> >> >> > > thoughts,
> >> >> > > > > and
> >> >> > > > > > > any
> >> >> > > > > > > > > > way this proposal
> >> >> > > > > > > > > > is a great improvement.
> >> >> > > > > > > > > >
> >> >> > > > > > > > > > Looking forward to your implementation and i will
> >> keep
> >> >> > focus
> >> >> > > on
> >> >> > > > > it.
> >> >> > > > > > > > > > Thanks again.
> >> >> > > > > > > > > >
> >> >> > > > > > > > > > Best
> >> >> > > > > > > > > > JunFan.
> >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor Somogyi <
> >> >> > > > > > > > gabor.g.somo...@gmail.com
> >> >> > > > > > > > > >,
> >> >> > > > > > > > > > wrote:
> >> >> > > > > > > > > > > Just to confirm keeping
> >> >> > > > > > "security.kerberos.fetch.delegation-token"
> >> >> > > > > > > is
> >> >> > > > > > > > > > added
> >> >> > > > > > > > > > > to the doc.
> >> >> > > > > > > > > > >
> >> >> > > > > > > > > > > BR,
> >> >> > > > > > > > > > > G
> >> >> > > > > > > > > > >
> >> >> > > > > > > > > > >
> >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor Somogyi <
> >> >> > > > > > > > > gabor.g.somo...@gmail.com
> >> >> > > > > > > > > > >
> >> >> > > > > > > > > > > wrote:
> >> >> > > > > > > > > > >
> >> >> > > > > > > > > > > > Hi JunFan,
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > > By the way, maybe this should be added in the
> >> >> > migration
> >> >> > > > > plan
> >> >> > > > > > or
> >> >> > > > > > > > > > > > intergation section in the FLIP-211.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > Going to add this soon.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > > Besides, I have a question that the KDC will
> >> >> collapse
> >> >> > > > when
> >> >> > > > > > the
> >> >> > > > > > > > > > cluster
> >> >> > > > > > > > > > > > reached 200 nodes you described
> >> >> > > > > > > > > > > > in the google doc. Do you have any attachment
> or
> >> >> > > reference
> >> >> > > > to
> >> >> > > > > > > prove
> >> >> > > > > > > > > it?
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > "KDC *may* collapse under some circumstances"
> is
> >> the
> >> >> > > proper
> >> >> > > > > > > > wording.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > We have several customers who are executing
> >> >> workloads
> >> >> > on
> >> >> > > > > > > > Spark/Flink.
> >> >> > > > > > > > > > Most
> >> >> > > > > > > > > > > > of the time I'm facing their
> >> >> > > > > > > > > > > > daily issues which is heavily environment and
> >> >> use-case
> >> >> > > > > > dependent.
> >> >> > > > > > > > > I've
> >> >> > > > > > > > > > > > seen various cases:
> >> >> > > > > > > > > > > > * where the mentioned ~1k nodes were working
> fine
> >> >> > > > > > > > > > > > * where KDC thought the number of requests are
> >> >> coming
> >> >> > > from
> >> >> > > > > DDOS
> >> >> > > > > > > > > attack
> >> >> > > > > > > > > > so
> >> >> > > > > > > > > > > > discontinued authentication
> >> >> > > > > > > > > > > > * where KDC was simply not responding because
> of
> >> the
> >> >> > load
> >> >> > > > > > > > > > > > * where KDC was intermittently had some outage
> >> (this
> >> >> > was
> >> >> > > > the
> >> >> > > > > > most
> >> >> > > > > > > > > nasty
> >> >> > > > > > > > > > > > thing)
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > Since you're managing relatively big cluster
> then
> >> >> you
> >> >> > > know
> >> >> > > > > that
> >> >> > > > > > > KDC
> >> >> > > > > > > > > is
> >> >> > > > > > > > > > not
> >> >> > > > > > > > > > > > only used by Spark/Flink workloads
> >> >> > > > > > > > > > > > but the whole company IT infrastructure is
> >> bombing
> >> >> it
> >> >> > so
> >> >> > > it
> >> >> > > > > > > really
> >> >> > > > > > > > > > depends
> >> >> > > > > > > > > > > > on other factors too whether KDC is reaching
> >> >> > > > > > > > > > > > it's limit or not. Not sure what kind of
> evidence
> >> >> are
> >> >> > you
> >> >> > > > > > looking
> >> >> > > > > > > > for
> >> >> > > > > > > > > > but
> >> >> > > > > > > > > > > > I'm not authorized to share any information
> about
> >> >> > > > > > > > > > > > our clients data.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > One thing is for sure. The more external system
> >> >> types
> >> >> > are
> >> >> > > > > used
> >> >> > > > > > in
> >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, Hive, Kafka)
> >> which
> >> >> > > > > > > > > > > > are authenticating through KDC the more
> >> possibility
> >> >> to
> >> >> > > > reach
> >> >> > > > > > this
> >> >> > > > > > > > > > > > threshold when the cluster is big enough.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > All in all this feature is here to help all
> users
> >> >> never
> >> >> > > > reach
> >> >> > > > > > > this
> >> >> > > > > > > > > > > > limitation.
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > BR,
> >> >> > > > > > > > > > > > G
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 <
> >> >> > > > zuston.sha...@gmail.com
> >> >> > > > > >
> >> >> > > > > > > > wrote:
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > > > > > Hi G
> >> >> > > > > > > > > > > > >
> >> >> > > > > > > > > > > > > Thanks for your quick reply. I think
> reserving
> >> the
> >> >> > > config
> >> >> > > > > of
> >> >> > > > > > > > > > > > > *security.kerberos.fetch.delegation-token*
> >> >> > > > > > > > > > > > > and simplifying disable the token fetching
> is a
> >> >> good
> >> >> > > > > idea.By
> >> >> > > > > > > the
> >> >> > > > > > > > > way,
> >> >> > > > > > > > > > > > > maybe this should be added
> >> >> > > > > > > > > > > > > in the migration plan or intergation section
> in
> >> >> the
> >> >> > > > > FLIP-211.
> >> >> > > > > > > > > > > > >
> >> >> > > > > > > > > > > > > Besides, I have a question that the KDC will
> >> >> collapse
> >> >> > > > when
> >> >> > > > > > the
> >> >> > > > > > > > > > cluster
> >> >> > > > > > > > > > > > > reached 200 nodes you described
> >> >> > > > > > > > > > > > > in the google doc. Do you have any attachment
> >> or
> >> >> > > > reference
> >> >> > > > > to
> >> >> > > > > > > > prove
> >> >> > > > > > > > > > it?
> >> >> > > > > > > > > > > > > Because in our internal per-cluster,
> >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC looks good.
> >> Do i
> >> >> > > missed
> >> >> > > > or
> >> >> > > > > > > > > > misunderstood
> >> >> > > > > > > > > > > > > something? Please correct me.
> >> >> > > > > > > > > > > > >
> >> >> > > > > > > > > > > > > Best
> >> >> > > > > > > > > > > > > JunFan.
> >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800,
> >> >> dev@flink.apache.org
> >> >> > ,
> >> >> > > > > wrote:
> >> >> > > > > > > > > > > > > >
> >> >> > > > > > > > > > > > > >
> >> >> > > > > > > > > > > > >
> >> >> > > > > > > > > >
> >> >> > > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
> >> >> > > > > > > > > > > > >
> >> >> > > > > > > > > > > >
> >> >> > > > > > > > > >
> >> >> > > > > > > > >
> >> >> > > > > > > >
> >> >> > > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >>
> >
>

Reply via email to