Hey Gabor,

let me know your cwiki username, and I can give you write permissions.


On Fri, Jan 28, 2022 at 4:05 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Thanks for making the design better! No further thing to discuss from my
> side.
>
> Started to reflect the agreement in the FLIP doc.
> Since I don't have access to the wiki I need to ask Marci to do that which
> may take some time.
>
> G
>
>
> On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote:
>
> > Hi,
> >
> > AFAIU an under registration TM is not added to the registered TMs map
> until
> > > RegistrationResponse ..
> > >
> >
> > I think you're right, with a careful design around threading (delegating
> > update broadcasts to the main thread) + synchronous initial update (that
> > would be nice to avoid) this should be doable.
> >
> > Not sure what you mean "we can't register the TM without providing it
> with
> > > token" but in unsecure configuration registration must happen w/o
> tokens.
> > >
> >
> > Exactly as you describe it, this was meant only for the "kerberized /
> > secured" cluster case, in other cases we wouldn't enforce a non-null
> token
> > in the response
> >
> > I think this is a good idea in general.
> > >
> >
> > +1
> >
> > If you don't have any more thoughts on the RPC / lifecycle part, can you
> > please reflect it into the FLIP?
> >
> > D.
> >
> > On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi <gabor.g.somo...@gmail.com
> >
> > wrote:
> >
> > > > - Make sure DTs issued by single DTMs are monotonically increasing
> (can
> > > be
> > > sorted on TM side)
> > >
> > > AFAIU an under registration TM is not added to the registered TMs map
> > until
> > > RegistrationResponse
> > > is processed which would contain the initial tokens. If that's true
> then
> > > how is it possible to have race with
> > > DTM update which is working on the registered TMs list?
> > > To be more specific "taskExecutors" is the registered map of TMs to
> which
> > > DTM can send updated tokens
> > > but this doesn't contain the under registration TM while
> > > RegistrationResponse is not processed, right?
> > >
> > > Of course if DTM can update while RegistrationResponse is processed
> then
> > > somehow sorting would be
> > > required and that case I would agree.
> > >
> > > - Scope DT updates by the RM ID and ensure that TM only accepts update
> > from
> > > the current leader
> > >
> > > I've planned this initially the mentioned way so agreed.
> > >
> > > - Return initial token with the RegistrationResponse, which should make
> > the
> > > RPC contract bit clearer (ensure that we can't register the TM without
> > > providing it with token)
> > >
> > > I think this is a good idea in general. Not sure what you mean "we
> can't
> > > register the TM without
> > > providing it with token" but in unsecure configuration registration
> must
> > > happen w/o tokens.
> > > All in all the newly added tokens field must be somehow optional.
> > >
> > > G
> > >
> > >
> > > On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org> wrote:
> > >
> > > > We had a long discussion with Chesnay about the possible edge cases
> and
> > > it
> > > > basically boils down to the following two scenarios:
> > > >
> > > > 1) There is a possible race condition between TM registration (the
> > first
> > > DT
> > > > update) and token refresh if they happen simultaneously. Than the
> > > > registration might beat the refreshed token. This could be easily
> > > addressed
> > > > if DTs could be sorted (eg. by the expiration time) on the TM side.
> In
> > > > other words, if there are multiple updates at the same time we need
> to
> > > make
> > > > sure that we have a deterministic way of choosing the latest one.
> > > >
> > > > One idea by Chesnay that popped up during this discussion was whether
> > we
> > > > could simply return the initial token with the RegistrationResponse
> to
> > > > avoid making an extra call during the TM registration.
> > > >
> > > > 2) When the RM leadership changes (eg. because zookeeper session
> times
> > > out)
> > > > there might be a race condition where the old RM is shutting down and
> > > > updates the tokens, that it might again beat the registration token
> of
> > > the
> > > > new RM. This could be avoided if we scope the token by
> > > _ResourceManagerId_
> > > > and only accept updates for the current leader (basically we'd have
> an
> > > > extra parameter to the _updateDelegationToken_ method).
> > > >
> > > > -
> > > >
> > > > DTM is way simpler then for example slot management, which could
> > receive
> > > > updates from the JobMaster that RM might not know about.
> > > >
> > > > So if you want to go in the path you're describing it should be
> doable
> > > and
> > > > we'd propose following to cover all cases:
> > > >
> > > > - Make sure DTs issued by single DTMs are monotonically increasing
> (can
> > > be
> > > > sorted on TM side)
> > > > - Scope DT updates by the RM ID and ensure that TM only accepts
> update
> > > from
> > > > the current leader
> > > > - Return initial token with the RegistrationResponse, which should
> make
> > > the
> > > > RPC contract bit clearer (ensure that we can't register the TM
> without
> > > > providing it with token)
> > > >
> > > > Any thoughts?
> > > >
> > > >
> > > > On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi <
> > > gabor.g.somo...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for investing your time!
> > > > >
> > > > > The first 2 bulletpoint are clear.
> > > > > If there is a chance that a TM can go to an inconsistent state
> then I
> > > > agree
> > > > > with the 3rd bulletpoint.
> > > > > Just before we agree on that I would like to learn something new
> and
> > > > > understand how is it possible that a TM
> > > > > gets corrupted? (In Spark I've never seen such thing and no
> mechanism
> > > to
> > > > > fix this but Flink is definitely not Spark)
> > > > >
> > > > > Here is my understanding:
> > > > > * DTM pushes new obtained DTs to TMs and if any exception occurs
> > then a
> > > > > retry after "security.kerberos.tokens.retry-wait"
> > > > > happens. This means DTM retries until it's not possible to send new
> > DTs
> > > > to
> > > > > all registered TMs.
> > > > > * New TM registration must fail if "updateDelegationToken" fails
> > > > > * "updateDelegationToken" fails consistently like a DB (at least I
> > plan
> > > > to
> > > > > implement it that way).
> > > > > If DTs are arriving on the TM side then a single
> > > > > "UserGroupInformation.getCurrentUser.addCredentials"
> > > > > will be called which I've never seen it failed.
> > > > > * I hope all other code parts are not touching existing DTs within
> > the
> > > > JVM
> > > > >
> > > > > I would like to emphasize I'm not against to add it just want to
> see
> > > what
> > > > > kind of problems are we facing.
> > > > > It would ease to catch bugs earlier and help in the maintenance.
> > > > >
> > > > > All in all I would buy the idea to add the 3rd bullet if we foresee
> > the
> > > > > need.
> > > > >
> > > > > G
> > > > >
> > > > >
> > > > > On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Gabor,
> > > > > >
> > > > > > This is definitely headed in a right direction +1.
> > > > > >
> > > > > > I think we still need to have a safeguard in case some of the TMs
> > > gets
> > > > > into
> > > > > > the inconsistent state though, which will also eliminate the need
> > for
> > > > > > implementing a custom retry mechanism (when
> _updateDelegationToken_
> > > > call
> > > > > > fails for some reason).
> > > > > >
> > > > > > We already have this safeguard in place for slot pool (in case
> > there
> > > > are
> > > > > > some slots in inconsistent state - eg. we haven't freed them for
> > some
> > > > > > reason) and for the partition tracker, which could be simply
> > > enhanced.
> > > > > This
> > > > > > is done via periodic heartbeat from TaskManagers to the
> > > ResourceManager
> > > > > > that contains report about state of these two components (from TM
> > > > > > perspective) so the RM can reconcile their state if necessary.
> > > > > >
> > > > > > I don't think adding an additional field to
> > > > > _TaskExecutorHeartbeatPayload_
> > > > > > should be a concern as we only heartbeat every ~ 10s by default
> and
> > > the
> > > > > new
> > > > > > field would be small compared to rest of the existing payload.
> Also
> > > > > > heartbeat doesn't need to contain the whole DT, but just some
> > > > identifier
> > > > > > which signals whether it uses the right one, that could be
> > > > significantly
> > > > > > smaller.
> > > > > >
> > > > > > This is still a PUSH based approach as the RM would again call
> the
> > > > newly
> > > > > > introduced _updateDelegationToken_ when it encounters
> inconsistency
> > > > (eg.
> > > > > > due to a temporary network partition / a race condition we didn't
> > > test
> > > > > for
> > > > > > / some other scenario we didn't think about). In practice these
> > > > > > inconsistencies are super hard to avoid and reason about (and
> > > > > unfortunately
> > > > > > yes, we see them happen from time to time), so reusing the
> existing
> > > > > > mechanism that is designed for this exact problem simplify
> things.
> > > > > >
> > > > > > To sum this up we'd have three code paths for calling
> > > > > > _updateDelegationToken_:
> > > > > > 1) When the TM registers, we push the token (if DTM already has
> it)
> > > to
> > > > it
> > > > > > 2) When DTM obtains a new token it broadcasts it to all currently
> > > > > connected
> > > > > > TMs
> > > > > > 3) When a TM gets out of sync, DTM would reconcile it's state
> > > > > >
> > > > > > WDYT?
> > > > > >
> > > > > > Best,
> > > > > > D.
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 26, 2022 at 9:03 PM David Morávek <d...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > Thanks the update, I'll go over it tomorrow.
> > > > > > >
> > > > > > > On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi <
> > > > > gabor.g.somo...@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi All,
> > > > > > >>
> > > > > > >> Since it has turned out that DTM can't be added as member of
> > > > JobMaster
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> > > > > > >> >
> > > > > > >> I've
> > > > > > >> came up with a better proposal.
> > > > > > >> David, thanks for pinpointing this out, you've caught a bug in
> > the
> > > > > early
> > > > > > >> phase!
> > > > > > >>
> > > > > > >> Namely ResourceManager
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
> > > > > > >> >
> > > > > > >> is
> > > > > > >> a single instance class where DTM can be added as member
> > variable.
> > > > > > >> It has a list of all already registered TMs and new TM
> > > registration
> > > > is
> > > > > > >> also
> > > > > > >> happening here.
> > > > > > >> The following can be added from logic perspective to be more
> > > > specific:
> > > > > > >> * Create new DTM instance in ResourceManager
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
> > > > > > >> >
> > > > > > >> and
> > > > > > >> start it (re-occurring thread to obtain new tokens)
> > > > > > >> * Add a new function named "updateDelegationTokens" to
> > > > > > TaskExecutorGateway
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54
> > > > > > >> >
> > > > > > >> * Call "updateDelegationTokens" on all registered TMs to
> > propagate
> > > > new
> > > > > > DTs
> > > > > > >> * In case of new TM registration call "updateDelegationTokens"
> > > > before
> > > > > > >> registration succeeds to setup new TM properly
> > > > > > >>
> > > > > > >> This way:
> > > > > > >> * only a single DTM would live within a cluster which is the
> > > > expected
> > > > > > >> behavior
> > > > > > >> * DTM is going to be added to a central place where all
> > deployment
> > > > > > target
> > > > > > >> can make use of it
> > > > > > >> * DTs are going to be pushed to TMs which would generate less
> > > > network
> > > > > > >> traffic than pull based approach
> > > > > > >> (please see my previous mail where I've described both
> > approaches)
> > > > > > >> * HA scenario is going to be consistent because such
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069
> > > > > > >> >
> > > > > > >> a solution can be added to "updateDelegationTokens"
> > > > > > >>
> > > > > > >> @David or all others plz share whether you agree on this or
> you
> > > have
> > > > > > >> better
> > > > > > >> idea/suggestion.
> > > > > > >>
> > > > > > >> BR,
> > > > > > >> G
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi <
> > > > > > gabor.g.somo...@gmail.com
> > > > > > >> >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > First of all thanks for investing your time and helping me
> > out.
> > > > As I
> > > > > > see
> > > > > > >> > you have pretty solid knowledge in the RPC area.
> > > > > > >> > I would like to rely on your knowledge since I'm learning
> this
> > > > part.
> > > > > > >> >
> > > > > > >> > > - Do we need to introduce a new RPC method or can we for
> > > example
> > > > > > >> > piggyback
> > > > > > >> > on heartbeats?
> > > > > > >> >
> > > > > > >> > I'm fine with either solution but one thing is important
> > > > > conceptually.
> > > > > > >> > There are fundamentally 2 ways how tokens can be updated:
> > > > > > >> > - Push way: When there are new DTs then JM JVM pushes DTs to
> > TM
> > > > > JVMs.
> > > > > > >> This
> > > > > > >> > is the preferred one since tiny amount of control logic
> > needed.
> > > > > > >> > - Pull way: Each time a TM would like to poll JM whether
> there
> > > are
> > > > > new
> > > > > > >> > tokens and each TM wants to decide alone whether DTs needs
> to
> > be
> > > > > > >> updated or
> > > > > > >> > not.
> > > > > > >> > As you've mentioned here some ID needs to be generated, it
> > would
> > > > > > >> generated
> > > > > > >> > quite some additional network traffic which can be
> definitely
> > > > > avoided.
> > > > > > >> > As a final thought in Spark we've had this way of DT
> > propagation
> > > > > logic
> > > > > > >> and
> > > > > > >> > we've had major issues with it.
> > > > > > >> >
> > > > > > >> > So all in all DTM needs to obtain new tokens and there must
> a
> > > way
> > > > to
> > > > > > >> send
> > > > > > >> > this data to all TMs from JM.
> > > > > > >> >
> > > > > > >> > > - What delivery semantics are we looking for? (what if
> we're
> > > > only
> > > > > > >> able to
> > > > > > >> > update subset of TMs / what happens if we exhaust retries /
> > > should
> > > > > we
> > > > > > >> even
> > > > > > >> > have the retry mechanism whatsoever) - I have a feeling that
> > > > somehow
> > > > > > >> > leveraging the existing heartbeat mechanism could help to
> > answer
> > > > > these
> > > > > > >> > questions
> > > > > > >> >
> > > > > > >> > Let's go through these questions one by one.
> > > > > > >> > > What delivery semantics are we looking for?
> > > > > > >> >
> > > > > > >> > DTM must receive an exception when at least one TM was not
> > able
> > > to
> > > > > get
> > > > > > >> DTs.
> > > > > > >> >
> > > > > > >> > > what if we're only able to update subset of TMs?
> > > > > > >> >
> > > > > > >> > Such case DTM will reschedule token obtain after
> > > > > > >> > "security.kerberos.tokens.retry-wait" time.
> > > > > > >> >
> > > > > > >> > > what happens if we exhaust retries?
> > > > > > >> >
> > > > > > >> > There is no number of retries. In default configuration
> tokens
> > > > needs
> > > > > > to
> > > > > > >> be
> > > > > > >> > re-obtained after one day.
> > > > > > >> > DTM tries to obtain new tokens after 1day * 0.75
> > > > > > >> > (security.kerberos.tokens.renewal-ratio) = 18 hours.
> > > > > > >> > When fails it retries after
> > > "security.kerberos.tokens.retry-wait"
> > > > > > which
> > > > > > >> is
> > > > > > >> > 1 hour by default.
> > > > > > >> > If it never succeeds then authentication error is going to
> > > happen
> > > > on
> > > > > > the
> > > > > > >> > TM side and the workload is
> > > > > > >> > going to stop.
> > > > > > >> >
> > > > > > >> > > should we even have the retry mechanism whatsoever?
> > > > > > >> >
> > > > > > >> > Yes, because there are always temporary cluster issues.
> > > > > > >> >
> > > > > > >> > > What does it mean for the running application (how does
> this
> > > > look
> > > > > > like
> > > > > > >> > from
> > > > > > >> > the user perspective)? As far as I remember the logs are
> only
> > > > > > collected
> > > > > > >> > ("aggregated") after the container is stopped, is that
> > correct?
> > > > > > >> >
> > > > > > >> > With default config it works like that but it can be forced
> to
> > > > > > aggregate
> > > > > > >> > at specific intervals.
> > > > > > >> > A useful feature is forcing YARN to aggregate logs while the
> > job
> > > > is
> > > > > > >> still
> > > > > > >> > running.
> > > > > > >> > For long-running jobs such as streaming jobs, this is
> > > invaluable.
> > > > To
> > > > > > do
> > > > > > >> > this,
> > > > > > >> >
> > > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds
> > > > > must
> > > > > > >> be
> > > > > > >> > set to a non-negative value.
> > > > > > >> > When this is set, a timer will be set for the given
> duration,
> > > and
> > > > > > >> whenever
> > > > > > >> > that timer goes off,
> > > > > > >> > log aggregation will run on new files.
> > > > > > >> >
> > > > > > >> > > I think
> > > > > > >> > this topic should get its own section in the FLIP (having
> some
> > > > cross
> > > > > > >> > reference to YARN ticket would be really useful, but I'm not
> > > sure
> > > > if
> > > > > > >> there
> > > > > > >> > are any).
> > > > > > >> >
> > > > > > >> > I think this is important knowledge but this FLIP is not
> > > touching
> > > > > the
> > > > > > >> > already existing behavior.
> > > > > > >> > DTs are set on the AM container which is renewed by YARN
> until
> > > > it's
> > > > > > not
> > > > > > >> > possible anymore.
> > > > > > >> > Any kind of new code is not going to change this limitation.
> > > BTW,
> > > > > > there
> > > > > > >> is
> > > > > > >> > no jira for this.
> > > > > > >> > If you think it worth to write this down then I think the
> good
> > > > place
> > > > > > is
> > > > > > >> > the official security doc
> > > > > > >> > area as caveat.
> > > > > > >> >
> > > > > > >> > > If we split the FLIP into two parts / sections that I've
> > > > > suggested,
> > > > > > I
> > > > > > >> > don't
> > > > > > >> > really think that you need to explicitly test for each
> > > deployment
> > > > > > >> scenario
> > > > > > >> > / cluster framework, because the DTM part is completely
> > > > independent
> > > > > of
> > > > > > >> the
> > > > > > >> > deployment target. Basically this is what I'm aiming for
> with
> > > > > "making
> > > > > > it
> > > > > > >> > work with the standalone" (as simple as starting a new java
> > > > process)
> > > > > > >> Flink
> > > > > > >> > first (which is also how most people deploy streaming
> > > application
> > > > on
> > > > > > k8s
> > > > > > >> > and the direction we're pushing forward with the
> auto-scaling
> > /
> > > > > > reactive
> > > > > > >> > mode initiatives).
> > > > > > >> >
> > > > > > >> > I see your point and agree the main direction. k8s is the
> > > > megatrend
> > > > > > >> which
> > > > > > >> > most of the peoples
> > > > > > >> > will use sooner or later. Not 100% sure what kind of split
> you
> > > > > suggest
> > > > > > >> but
> > > > > > >> > in my view
> > > > > > >> > the main target is to add this feature and I'm open to any
> > > logical
> > > > > > work
> > > > > > >> > ordering.
> > > > > > >> > Please share the specific details and we work it out...
> > > > > > >> >
> > > > > > >> > G
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Mon, Jan 24, 2022 at 3:04 PM David Morávek <
> > d...@apache.org>
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> >> >
> > > > > > >> >> > Could you point to a code where you think it could be
> added
> > > > > > exactly?
> > > > > > >> A
> > > > > > >> >> > helping hand is welcome here 🙂
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >> >> I think you can take a look at
> > > _ResourceManagerPartitionTracker_
> > > > > [1]
> > > > > > >> which
> > > > > > >> >> seems to have somewhat similar properties to the DTM.
> > > > > > >> >>
> > > > > > >> >> One topic that needs to be addressed there is how the RPC
> > with
> > > > the
> > > > > > >> >> _TaskExecutorGateway_ should look like.
> > > > > > >> >> - Do we need to introduce a new RPC method or can we for
> > > example
> > > > > > >> piggyback
> > > > > > >> >> on heartbeats?
> > > > > > >> >> - What delivery semantics are we looking for? (what if
> we're
> > > only
> > > > > > able
> > > > > > >> to
> > > > > > >> >> update subset of TMs / what happens if we exhaust retries /
> > > > should
> > > > > we
> > > > > > >> even
> > > > > > >> >> have the retry mechanism whatsoever) - I have a feeling
> that
> > > > > somehow
> > > > > > >> >> leveraging the existing heartbeat mechanism could help to
> > > answer
> > > > > > these
> > > > > > >> >> questions
> > > > > > >> >>
> > > > > > >> >> In short, after DT reaches it's max lifetime then log
> > > aggregation
> > > > > > stops
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >> >> What does it mean for the running application (how does
> this
> > > look
> > > > > > like
> > > > > > >> >> from
> > > > > > >> >> the user perspective)? As far as I remember the logs are
> only
> > > > > > collected
> > > > > > >> >> ("aggregated") after the container is stopped, is that
> > > correct? I
> > > > > > think
> > > > > > >> >> this topic should get its own section in the FLIP (having
> > some
> > > > > cross
> > > > > > >> >> reference to YARN ticket would be really useful, but I'm
> not
> > > sure
> > > > > if
> > > > > > >> there
> > > > > > >> >> are any).
> > > > > > >> >>
> > > > > > >> >> All deployment modes (per-job, per-app, ...) are planned to
> > be
> > > > > tested
> > > > > > >> and
> > > > > > >> >> > expect to work with the initial implementation however
> not
> > > all
> > > > > > >> >> deployment
> > > > > > >> >> > targets (k8s, local, ...
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >> >> If we split the FLIP into two parts / sections that I've
> > > > > suggested, I
> > > > > > >> >> don't
> > > > > > >> >> really think that you need to explicitly test for each
> > > deployment
> > > > > > >> scenario
> > > > > > >> >> / cluster framework, because the DTM part is completely
> > > > independent
> > > > > > of
> > > > > > >> the
> > > > > > >> >> deployment target. Basically this is what I'm aiming for
> with
> > > > > "making
> > > > > > >> it
> > > > > > >> >> work with the standalone" (as simple as starting a new java
> > > > > process)
> > > > > > >> Flink
> > > > > > >> >> first (which is also how most people deploy streaming
> > > application
> > > > > on
> > > > > > >> k8s
> > > > > > >> >> and the direction we're pushing forward with the
> > auto-scaling /
> > > > > > >> reactive
> > > > > > >> >> mode initiatives).
> > > > > > >> >>
> > > > > > >> >> The whole integration with YARN (let's forget about log
> > > > aggregation
> > > > > > >> for a
> > > > > > >> >> moment) / k8s-native only boils down to how do we make the
> > > keytab
> > > > > > file
> > > > > > >> >> local to the JobManager so the DTM can read it, so it's
> > > basically
> > > > > > >> built on
> > > > > > >> >> top of that. The only special thing that needs to be tested
> > > there
> > > > > is
> > > > > > >> the
> > > > > > >> >> "keytab distribution" code path.
> > > > > > >> >>
> > > > > > >> >> [1]
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
> > > > > > >> >>
> > > > > > >> >> Best,
> > > > > > >> >> D.
> > > > > > >> >>
> > > > > > >> >> On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi <
> > > > > > >> gabor.g.somo...@gmail.com
> > > > > > >> >> >
> > > > > > >> >> wrote:
> > > > > > >> >>
> > > > > > >> >> > > There is a separate JobMaster for each job
> > > > > > >> >> > within a Flink cluster and each JobMaster only has a
> > partial
> > > > view
> > > > > > of
> > > > > > >> the
> > > > > > >> >> > task managers
> > > > > > >> >> >
> > > > > > >> >> > Good point! I've had a deeper look and you're right. We
> > > > > definitely
> > > > > > >> need
> > > > > > >> >> to
> > > > > > >> >> > find another place.
> > > > > > >> >> >
> > > > > > >> >> > > Related per-cluster or per-job keytab:
> > > > > > >> >> >
> > > > > > >> >> > In the current code per-cluster keytab is implemented and
> > I'm
> > > > > > >> intended
> > > > > > >> >> to
> > > > > > >> >> > keep it like this within this FLIP. The reason is simple:
> > > > tokens
> > > > > on
> > > > > > >> TM
> > > > > > >> >> side
> > > > > > >> >> > can be stored within the UserGroupInformation (UGI)
> > structure
> > > > > which
> > > > > > >> is
> > > > > > >> >> > global. I'm not telling it's impossible to change that
> but
> > I
> > > > > think
> > > > > > >> that
> > > > > > >> >> > this is such a complexity which the initial
> implementation
> > is
> > > > not
> > > > > > >> >> required
> > > > > > >> >> > to contain. Additionally we've not seen such need from
> user
> > > > side.
> > > > > > If
> > > > > > >> the
> > > > > > >> >> > need may rise later on then another FLIP with this topic
> > can
> > > be
> > > > > > >> created
> > > > > > >> >> and
> > > > > > >> >> > discussed. Proper multi-UGI handling within a single JVM
> > is a
> > > > > topic
> > > > > > >> >> where
> > > > > > >> >> > several round of deep-dive with the Hadoop/YARN guys are
> > > > > required.
> > > > > > >> >> >
> > > > > > >> >> > > single DTM instance embedded with
> > > > > > >> >> > the ResourceManager (the Flink component)
> > > > > > >> >> >
> > > > > > >> >> > Could you point to a code where you think it could be
> added
> > > > > > exactly?
> > > > > > >> A
> > > > > > >> >> > helping hand is welcome here🙂
> > > > > > >> >> >
> > > > > > >> >> > > Then the single (initial) implementation should work
> with
> > > all
> > > > > the
> > > > > > >> >> > deployments modes out of the box (which is not what the
> > FLIP
> > > > > > >> suggests).
> > > > > > >> >> Is
> > > > > > >> >> > that correct?
> > > > > > >> >> >
> > > > > > >> >> > All deployment modes (per-job, per-app, ...) are planned
> to
> > > be
> > > > > > tested
> > > > > > >> >> and
> > > > > > >> >> > expect to work with the initial implementation however
> not
> > > all
> > > > > > >> >> deployment
> > > > > > >> >> > targets (k8s, local, ...) are not intended to be tested.
> > Per
> > > > > > >> deployment
> > > > > > >> >> > target new jira needs to be created where I expect small
> > > number
> > > > > of
> > > > > > >> codes
> > > > > > >> >> > needs to be added and relatively expensive testing effort
> > is
> > > > > > >> required.
> > > > > > >> >> >
> > > > > > >> >> > > I've taken a look into the prototype and in the
> > > > > > >> >> "YarnClusterDescriptor"
> > > > > > >> >> > you're injecting a delegation token into the AM [1]
> (that's
> > > > > > obtained
> > > > > > >> >> using
> > > > > > >> >> > the provided keytab). If I understand this correctly from
> > > > > previous
> > > > > > >> >> > discussion / FLIP, this is to support log aggregation and
> > DT
> > > > has
> > > > > a
> > > > > > >> >> limited
> > > > > > >> >> > validity. How is this DT going to be renewed?
> > > > > > >> >> >
> > > > > > >> >> > You're clever and touched a limitation which Spark has
> too.
> > > In
> > > > > > short,
> > > > > > >> >> after
> > > > > > >> >> > DT reaches it's max lifetime then log aggregation stops.
> > I've
> > > > had
> > > > > > >> >> several
> > > > > > >> >> > deep-dive rounds with the YARN guys at Spark years
> because
> > > > wanted
> > > > > > to
> > > > > > >> >> fill
> > > > > > >> >> > this gap. They can't provide us any way to re-inject the
> > > newly
> > > > > > >> obtained
> > > > > > >> >> DT
> > > > > > >> >> > so at the end I gave up this.
> > > > > > >> >> >
> > > > > > >> >> > BR,
> > > > > > >> >> > G
> > > > > > >> >> >
> > > > > > >> >> >
> > > > > > >> >> > On Mon, 24 Jan 2022, 11:00 David Morávek, <
> d...@apache.org
> > >
> > > > > wrote:
> > > > > > >> >> >
> > > > > > >> >> > > Hi Gabor,
> > > > > > >> >> > >
> > > > > > >> >> > > There is actually a huge difference between JobManager
> > > > > (process)
> > > > > > >> and
> > > > > > >> >> > > JobMaster (job coordinator). The naming is
> unfortunately
> > > bit
> > > > > > >> >> misleading
> > > > > > >> >> > > here from historical reasons. There is a separate
> > JobMaster
> > > > for
> > > > > > >> each
> > > > > > >> >> job
> > > > > > >> >> > > within a Flink cluster and each JobMaster only has a
> > > partial
> > > > > view
> > > > > > >> of
> > > > > > >> >> the
> > > > > > >> >> > > task managers (depends on where the slots for a
> > particular
> > > > job
> > > > > > are
> > > > > > >> >> > > allocated). This means that you'll end up with N
> > > > > > >> >> > "DelegationTokenManagers"
> > > > > > >> >> > > competing with each other (N = number of running jobs
> in
> > > the
> > > > > > >> cluster).
> > > > > > >> >> > >
> > > > > > >> >> > > This makes me think we're mixing two abstraction levels
> > > here:
> > > > > > >> >> > >
> > > > > > >> >> > > a) Per-cluster delegation tokens
> > > > > > >> >> > > - Simpler approach, it would involve a single DTM
> > instance
> > > > > > embedded
> > > > > > >> >> with
> > > > > > >> >> > > the ResourceManager (the Flink component)
> > > > > > >> >> > > b) Per-job delegation tokens
> > > > > > >> >> > > - More complex approach, but could be more flexible
> from
> > > the
> > > > > user
> > > > > > >> >> side of
> > > > > > >> >> > > things.
> > > > > > >> >> > > - Multiple DTM instances, that are bound with the
> > JobMaster
> > > > > > >> lifecycle.
> > > > > > >> >> > > Delegation tokens are attached with a particular slots
> > that
> > > > are
> > > > > > >> >> executing
> > > > > > >> >> > > the job tasks instead of the whole task manager (TM
> could
> > > be
> > > > > > >> executing
> > > > > > >> >> > > multiple jobs with different tokens).
> > > > > > >> >> > > - The question is which keytab should be used for the
> > > > > clustering
> > > > > > >> >> > framework,
> > > > > > >> >> > > to support log aggregation on YARN (an extra keytab,
> > keytab
> > > > > that
> > > > > > >> comes
> > > > > > >> >> > with
> > > > > > >> >> > > the first job?)
> > > > > > >> >> > >
> > > > > > >> >> > > I think these are the things that need to be clarified
> in
> > > the
> > > > > > FLIP
> > > > > > >> >> before
> > > > > > >> >> > > proceeding.
> > > > > > >> >> > >
> > > > > > >> >> > > A follow-up question for getting a better understanding
> > > where
> > > > > > this
> > > > > > >> >> should
> > > > > > >> >> > > be headed: Are there any use cases where user may want
> to
> > > use
> > > > > > >> >> different
> > > > > > >> >> > > keytabs with each job, or are we fine with using a
> > > > cluster-wide
> > > > > > >> >> keytab?
> > > > > > >> >> > If
> > > > > > >> >> > > we go with per-cluster keytabs, is it OK that all jobs
> > > > > submitted
> > > > > > >> into
> > > > > > >> >> > this
> > > > > > >> >> > > cluster can access it (even the future ones)? Should
> this
> > > be
> > > > a
> > > > > > >> >> security
> > > > > > >> >> > > concern?
> > > > > > >> >> > >
> > > > > > >> >> > > Presume you though I would implement a new class with
> > > > > JobManager
> > > > > > >> name.
> > > > > > >> >> > The
> > > > > > >> >> > > > plan is not that.
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> > > I've never suggested such thing.
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> > > > No. That said earlier DT handling is planned to be
> done
> > > > > > >> completely
> > > > > > >> >> in
> > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains
> tokens
> > > in
> > > > > the
> > > > > > >> >> proper
> > > > > > >> >> > > time
> > > > > > >> >> > > > when needed.
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> > > Then the single (initial) implementation should work
> with
> > > all
> > > > > the
> > > > > > >> >> > > deployments modes out of the box (which is not what the
> > > FLIP
> > > > > > >> >> suggests).
> > > > > > >> >> > Is
> > > > > > >> >> > > that correct?
> > > > > > >> >> > >
> > > > > > >> >> > > If the cluster framework, also requires delegation
> token
> > > for
> > > > > > their
> > > > > > >> >> inner
> > > > > > >> >> > > working (this is IMO only applies to YARN), it might
> need
> > > an
> > > > > > extra
> > > > > > >> >> step
> > > > > > >> >> > > (injecting the token into application master
> container).
> > > > > > >> >> > >
> > > > > > >> >> > > Separating the individual layers (actual Flink cluster
> -
> > > > > > basically
> > > > > > >> >> making
> > > > > > >> >> > > this work with a standalone deployment  / "cluster
> > > > framework" -
> > > > > > >> >> support
> > > > > > >> >> > for
> > > > > > >> >> > > YARN log aggregation) in the FLIP would be useful.
> > > > > > >> >> > >
> > > > > > >> >> > > Reading the linked Spark readme could be useful.
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> > > I've read that, but please be patient with the
> questions,
> > > > > > Kerberos
> > > > > > >> is
> > > > > > >> >> not
> > > > > > >> >> > > an easy topic to get into and I've had a very little
> > > contact
> > > > > with
> > > > > > >> it
> > > > > > >> >> in
> > > > > > >> >> > the
> > > > > > >> >> > > past.
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> > > I've taken a look into the prototype and in the
> > > > > > >> >> "YarnClusterDescriptor"
> > > > > > >> >> > > you're injecting a delegation token into the AM [1]
> > (that's
> > > > > > >> obtained
> > > > > > >> >> > using
> > > > > > >> >> > > the provided keytab). If I understand this correctly
> from
> > > > > > previous
> > > > > > >> >> > > discussion / FLIP, this is to support log aggregation
> and
> > > DT
> > > > > has
> > > > > > a
> > > > > > >> >> > limited
> > > > > > >> >> > > validity. How is this DT going to be renewed?
> > > > > > >> >> > >
> > > > > > >> >> > > [1]
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261
> > > > > > >> >> > >
> > > > > > >> >> > > Best,
> > > > > > >> >> > > D.
> > > > > > >> >> > >
> > > > > > >> >> > > On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi <
> > > > > > >> >> gabor.g.somo...@gmail.com
> > > > > > >> >> > >
> > > > > > >> >> > > wrote:
> > > > > > >> >> > >
> > > > > > >> >> > > > Here is the exact class, I'm from mobile so not had a
> > > look
> > > > at
> > > > > > the
> > > > > > >> >> exact
> > > > > > >> >> > > > class name:
> > > > > > >> >> > > >
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
> > > > > > >> >> > > > That keeps track of TMs where the tokens can be sent
> > to.
> > > > > > >> >> > > >
> > > > > > >> >> > > > > My feeling would be that we shouldn't really
> > introduce
> > > a
> > > > > new
> > > > > > >> >> > component
> > > > > > >> >> > > > with
> > > > > > >> >> > > > a custom lifecycle, but rather we should try to
> > > incorporate
> > > > > > this
> > > > > > >> >> into
> > > > > > >> >> > > > existing ones.
> > > > > > >> >> > > >
> > > > > > >> >> > > > Can you be more specific? Presume you though I would
> > > > > implement
> > > > > > a
> > > > > > >> new
> > > > > > >> >> > > class
> > > > > > >> >> > > > with JobManager name. The plan is not that.
> > > > > > >> >> > > >
> > > > > > >> >> > > > > If I understand this correctly, this means that we
> > then
> > > > > push
> > > > > > >> the
> > > > > > >> >> > token
> > > > > > >> >> > > > renewal logic to YARN.
> > > > > > >> >> > > >
> > > > > > >> >> > > > No. That said earlier DT handling is planned to be
> done
> > > > > > >> completely
> > > > > > >> >> in
> > > > > > >> >> > > > Flink. DTM has a renewal thread which re-obtains
> tokens
> > > in
> > > > > the
> > > > > > >> >> proper
> > > > > > >> >> > > time
> > > > > > >> >> > > > when needed. YARN log aggregation is a totally
> > different
> > > > > > feature,
> > > > > > >> >> where
> > > > > > >> >> > > > YARN does the renewal. Log aggregation was an example
> > why
> > > > the
> > > > > > >> code
> > > > > > >> >> > can't
> > > > > > >> >> > > be
> > > > > > >> >> > > > 100% reusable for all resource managers. Reading the
> > > linked
> > > > > > Spark
> > > > > > >> >> > readme
> > > > > > >> >> > > > could be useful.
> > > > > > >> >> > > >
> > > > > > >> >> > > > G
> > > > > > >> >> > > >
> > > > > > >> >> > > > On Fri, 21 Jan 2022, 21:05 David Morávek, <
> > > d...@apache.org
> > > > >
> > > > > > >> wrote:
> > > > > > >> >> > > >
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > JobManager is the Flink class.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > There is no such class in Flink. The closest thing
> to
> > > the
> > > > > > >> >> JobManager
> > > > > > >> >> > > is a
> > > > > > >> >> > > > > ClusterEntrypoint. The cluster entrypoint spawns
> new
> > RM
> > > > > > Runner
> > > > > > >> &
> > > > > > >> >> > > > Dispatcher
> > > > > > >> >> > > > > Runner that start participating in the leader
> > election.
> > > > > Once
> > > > > > >> they
> > > > > > >> >> > gain
> > > > > > >> >> > > > > leadership they spawn the actual underlying
> instances
> > > of
> > > > > > these
> > > > > > >> two
> > > > > > >> >> > > "main
> > > > > > >> >> > > > > components".
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > My feeling would be that we shouldn't really
> > introduce
> > > a
> > > > > new
> > > > > > >> >> > component
> > > > > > >> >> > > > with
> > > > > > >> >> > > > > a custom lifecycle, but rather we should try to
> > > > incorporate
> > > > > > >> this
> > > > > > >> >> into
> > > > > > >> >> > > > > existing ones.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > My biggest concerns would be:
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > - How would the lifecycle of the new component look
> > > like
> > > > > with
> > > > > > >> >> regards
> > > > > > >> >> > > to
> > > > > > >> >> > > > HA
> > > > > > >> >> > > > > setups. If we really try to decide to introduce a
> > > > > completely
> > > > > > >> new
> > > > > > >> >> > > > component,
> > > > > > >> >> > > > > how should this work in case of multiple JobManager
> > > > > > instances?
> > > > > > >> >> > > > > - Which components does it talk to / how? For
> example
> > > how
> > > > > > does
> > > > > > >> the
> > > > > > >> >> > > > > broadcast of new token to task managers
> > > > > (TaskManagerGateway)
> > > > > > >> look
> > > > > > >> >> > like?
> > > > > > >> >> > > > Do
> > > > > > >> >> > > > > we simply introduce a new RPC on the
> > > > ResourceManagerGateway
> > > > > > >> that
> > > > > > >> >> > > > broadcasts
> > > > > > >> >> > > > > it or does the new component need to do some kind
> of
> > > > > > >> bookkeeping
> > > > > > >> >> of
> > > > > > >> >> > > task
> > > > > > >> >> > > > > managers that it needs to notify?
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > YARN based HDFS log aggregation would not work by
> > > > dropping
> > > > > > that
> > > > > > >> >> code.
> > > > > > >> >> > > > Just
> > > > > > >> >> > > > > > to be crystal clear, the actual implementation
> > > contains
> > > > > > this
> > > > > > >> fir
> > > > > > >> >> > > > exactly
> > > > > > >> >> > > > > > this reason.
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > This is the missing part +1. If I understand this
> > > > > correctly,
> > > > > > >> this
> > > > > > >> >> > means
> > > > > > >> >> > > > > that we then push the token renewal logic to YARN.
> > How
> > > do
> > > > > you
> > > > > > >> >> plan to
> > > > > > >> >> > > > > implement the renewal logic on k8s?
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > D.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi <
> > > > > > >> >> > > gabor.g.somo...@gmail.com
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > wrote:
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > > > I think we might both mean something different
> by
> > > the
> > > > > RM.
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > You feel it well, I've not specified these terms
> > well
> > > > in
> > > > > > the
> > > > > > >> >> > > > explanation.
> > > > > > >> >> > > > > > RM I meant resource management framework.
> > JobManager
> > > is
> > > > > the
> > > > > > >> >> Flink
> > > > > > >> >> > > > class.
> > > > > > >> >> > > > > > This means that inside JM instance there will be
> a
> > > DTM
> > > > > > >> >> instance, so
> > > > > > >> >> > > > they
> > > > > > >> >> > > > > > would have the same lifecycle. Hope I've answered
> > the
> > > > > > >> question.
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > > If we have tokens available on the client side,
> > why
> > > > do
> > > > > we
> > > > > > >> >> need to
> > > > > > >> >> > > set
> > > > > > >> >> > > > > > them
> > > > > > >> >> > > > > > into the AM (yarn specific concept) launch
> context?
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > YARN based HDFS log aggregation would not work by
> > > > > dropping
> > > > > > >> that
> > > > > > >> >> > code.
> > > > > > >> >> > > > > Just
> > > > > > >> >> > > > > > to be crystal clear, the actual implementation
> > > contains
> > > > > > this
> > > > > > >> fir
> > > > > > >> >> > > > exactly
> > > > > > >> >> > > > > > this reason.
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > G
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > On Fri, 21 Jan 2022, 20:12 David Morávek, <
> > > > > d...@apache.org
> > > > > > >
> > > > > > >> >> wrote:
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > > Hi Gabor,
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > 1. One thing is important, token management is
> > > > planned
> > > > > to
> > > > > > >> be
> > > > > > >> >> done
> > > > > > >> >> > > > > > > > generically within Flink and not scattered in
> > RM
> > > > > > specific
> > > > > > >> >> code.
> > > > > > >> >> > > > > > > JobManager
> > > > > > >> >> > > > > > > > has a DelegationTokenManager which obtains
> > tokens
> > > > > > >> >> time-to-time
> > > > > > >> >> > > (if
> > > > > > >> >> > > > > > > > configured properly). JM knows which
> > TaskManagers
> > > > are
> > > > > > in
> > > > > > >> >> place
> > > > > > >> >> > so
> > > > > > >> >> > > > it
> > > > > > >> >> > > > > > can
> > > > > > >> >> > > > > > > > distribute it to all TMs. That's it
> basically.
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > I think we might both mean something different
> by
> > > the
> > > > > RM.
> > > > > > >> >> > > JobManager
> > > > > > >> >> > > > is
> > > > > > >> >> > > > > > > basically just a process encapsulating multiple
> > > > > > components,
> > > > > > >> >> one
> > > > > > >> >> > of
> > > > > > >> >> > > > > which
> > > > > > >> >> > > > > > is
> > > > > > >> >> > > > > > > a ResourceManager, which is the component that
> > > > manages
> > > > > > task
> > > > > > >> >> > manager
> > > > > > >> >> > > > > > > registrations [1]. There is more or less a
> single
> > > > > > >> >> implementation
> > > > > > >> >> > of
> > > > > > >> >> > > > the
> > > > > > >> >> > > > > > RM
> > > > > > >> >> > > > > > > with plugable drivers for the active
> integrations
> > > > > (yarn,
> > > > > > >> k8s).
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > It would be great if you could share more
> details
> > > of
> > > > > how
> > > > > > >> >> exactly
> > > > > > >> >> > > the
> > > > > > >> >> > > > > DTM
> > > > > > >> >> > > > > > is
> > > > > > >> >> > > > > > > going to fit in the current JM architecture.
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > 2. 99.9% of the code is generic but each RM
> > handles
> > > > > > tokens
> > > > > > >> >> > > > > differently. A
> > > > > > >> >> > > > > > > > good example is YARN obtains tokens on client
> > > side
> > > > > and
> > > > > > >> then
> > > > > > >> >> > sets
> > > > > > >> >> > > > them
> > > > > > >> >> > > > > > on
> > > > > > >> >> > > > > > > > the newly created AM container launch
> context.
> > > This
> > > > > is
> > > > > > >> >> purely
> > > > > > >> >> > > YARN
> > > > > > >> >> > > > > > > specific
> > > > > > >> >> > > > > > > > and cant't be spared. With my actual plans
> > > > standalone
> > > > > > >> can be
> > > > > > >> >> > > > changed
> > > > > > >> >> > > > > to
> > > > > > >> >> > > > > > > use
> > > > > > >> >> > > > > > > > the framework. By using it I mean no RM
> > specific
> > > > DTM
> > > > > or
> > > > > > >> >> > > whatsoever
> > > > > > >> >> > > > is
> > > > > > >> >> > > > > > > > needed.
> > > > > > >> >> > > > > > > >
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > If we have tokens available on the client side,
> > why
> > > > do
> > > > > we
> > > > > > >> >> need to
> > > > > > >> >> > > set
> > > > > > >> >> > > > > > them
> > > > > > >> >> > > > > > > into the AM (yarn specific concept) launch
> > context?
> > > > Why
> > > > > > >> can't
> > > > > > >> >> we
> > > > > > >> >> > > > simply
> > > > > > >> >> > > > > > > send them to the JM, eg. as a parameter of the
> > job
> > > > > > >> submission
> > > > > > >> >> /
> > > > > > >> >> > via
> > > > > > >> >> > > > > > > separate RPC call? There might be something I'm
> > > > missing
> > > > > > >> due to
> > > > > > >> >> > > > limited
> > > > > > >> >> > > > > > > knowledge, but handling the token on the
> "cluster
> > > > > > >> framework"
> > > > > > >> >> > level
> > > > > > >> >> > > > > > doesn't
> > > > > > >> >> > > > > > > seem necessary.
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > [1]
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > Best,
> > > > > > >> >> > > > > > > D.
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > On Fri, Jan 21, 2022 at 7:48 PM Gabor Somogyi <
> > > > > > >> >> > > > > gabor.g.somo...@gmail.com
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > wrote:
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > > > > Oh and one more thing. I'm planning to add
> this
> > > > > feature
> > > > > > >> in
> > > > > > >> >> > small
> > > > > > >> >> > > > > chunk
> > > > > > >> >> > > > > > of
> > > > > > >> >> > > > > > > > PRs because security is super hairy area.
> That
> > > way
> > > > > > >> reviewers
> > > > > > >> >> > can
> > > > > > >> >> > > be
> > > > > > >> >> > > > > > more
> > > > > > >> >> > > > > > > > easily obtains the concept.
> > > > > > >> >> > > > > > > >
> > > > > > >> >> > > > > > > > On Fri, 21 Jan 2022, 18:03 David Morávek, <
> > > > > > >> d...@apache.org>
> > > > > > >> >> > > wrote:
> > > > > > >> >> > > > > > > >
> > > > > > >> >> > > > > > > > > Hi Gabor,
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > thanks for drafting the FLIP, I think
> having
> > a
> > > > > solid
> > > > > > >> >> Kerberos
> > > > > > >> >> > > > > support
> > > > > > >> >> > > > > > > is
> > > > > > >> >> > > > > > > > > crucial for many enterprise deployments.
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > I have multiple questions regarding the
> > > > > > implementation
> > > > > > >> >> (note
> > > > > > >> >> > > > that I
> > > > > > >> >> > > > > > > have
> > > > > > >> >> > > > > > > > > very limited knowledge of Kerberos):
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > 1) If I understand it correctly, we'll only
> > > > obtain
> > > > > > >> tokens
> > > > > > >> >> in
> > > > > > >> >> > > the
> > > > > > >> >> > > > > job
> > > > > > >> >> > > > > > > > > manager and then we'll distribute them via
> > RPC
> > > > > (needs
> > > > > > >> to
> > > > > > >> >> be
> > > > > > >> >> > > > > secured).
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > Can you please outline how the
> communication
> > > will
> > > > > > look
> > > > > > >> >> like?
> > > > > > >> >> > Is
> > > > > > >> >> > > > the
> > > > > > >> >> > > > > > > > > DelegationTokenManager going to be a part
> of
> > > the
> > > > > > >> >> > > ResourceManager?
> > > > > > >> >> > > > > Can
> > > > > > >> >> > > > > > > you
> > > > > > >> >> > > > > > > > > outline it's lifecycle / how it's going to
> be
> > > > > > >> integrated
> > > > > > >> >> > there?
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > 2) Do we really need a YARN / k8s specific
> > > > > > >> >> implementations?
> > > > > > >> >> > Is
> > > > > > >> >> > > it
> > > > > > >> >> > > > > > > > possible
> > > > > > >> >> > > > > > > > > to obtain / renew a token in a generic way?
> > > Maybe
> > > > > to
> > > > > > >> >> rephrase
> > > > > > >> >> > > > that,
> > > > > > >> >> > > > > > is
> > > > > > >> >> > > > > > > it
> > > > > > >> >> > > > > > > > > possible to implement
> DelegationTokenManager
> > > for
> > > > > the
> > > > > > >> >> > standalone
> > > > > > >> >> > > > > > Flink?
> > > > > > >> >> > > > > > > If
> > > > > > >> >> > > > > > > > > we're able to solve this point, it could be
> > > > > possible
> > > > > > to
> > > > > > >> >> > target
> > > > > > >> >> > > > all
> > > > > > >> >> > > > > > > > > deployment scenarios with a single
> > > > implementation.
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > Best,
> > > > > > >> >> > > > > > > > > D.
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > On Fri, Jan 14, 2022 at 3:47 AM Junfan
> Zhang
> > <
> > > > > > >> >> > > > > > zuston.sha...@gmail.com>
> > > > > > >> >> > > > > > > > > wrote:
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > > > > Hi G
> > > > > > >> >> > > > > > > > > >
> > > > > > >> >> > > > > > > > > > Thanks for your explain in detail. I have
> > > > gotten
> > > > > > your
> > > > > > >> >> > > thoughts,
> > > > > > >> >> > > > > and
> > > > > > >> >> > > > > > > any
> > > > > > >> >> > > > > > > > > > way this proposal
> > > > > > >> >> > > > > > > > > > is a great improvement.
> > > > > > >> >> > > > > > > > > >
> > > > > > >> >> > > > > > > > > > Looking forward to your implementation
> and
> > i
> > > > will
> > > > > > >> keep
> > > > > > >> >> > focus
> > > > > > >> >> > > on
> > > > > > >> >> > > > > it.
> > > > > > >> >> > > > > > > > > > Thanks again.
> > > > > > >> >> > > > > > > > > >
> > > > > > >> >> > > > > > > > > > Best
> > > > > > >> >> > > > > > > > > > JunFan.
> > > > > > >> >> > > > > > > > > > On Jan 13, 2022, 9:20 PM +0800, Gabor
> > > Somogyi <
> > > > > > >> >> > > > > > > > gabor.g.somo...@gmail.com
> > > > > > >> >> > > > > > > > > >,
> > > > > > >> >> > > > > > > > > > wrote:
> > > > > > >> >> > > > > > > > > > > Just to confirm keeping
> > > > > > >> >> > > > > > "security.kerberos.fetch.delegation-token"
> > > > > > >> >> > > > > > > is
> > > > > > >> >> > > > > > > > > > added
> > > > > > >> >> > > > > > > > > > > to the doc.
> > > > > > >> >> > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > BR,
> > > > > > >> >> > > > > > > > > > > G
> > > > > > >> >> > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > On Thu, Jan 13, 2022 at 1:34 PM Gabor
> > > > Somogyi <
> > > > > > >> >> > > > > > > > > gabor.g.somo...@gmail.com
> > > > > > >> >> > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > wrote:
> > > > > > >> >> > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > Hi JunFan,
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > By the way, maybe this should be
> > added
> > > in
> > > > > the
> > > > > > >> >> > migration
> > > > > > >> >> > > > > plan
> > > > > > >> >> > > > > > or
> > > > > > >> >> > > > > > > > > > > > intergation section in the FLIP-211.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > Going to add this soon.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the
> > KDC
> > > > > will
> > > > > > >> >> collapse
> > > > > > >> >> > > > when
> > > > > > >> >> > > > > > the
> > > > > > >> >> > > > > > > > > > cluster
> > > > > > >> >> > > > > > > > > > > > reached 200 nodes you described
> > > > > > >> >> > > > > > > > > > > > in the google doc. Do you have any
> > > > attachment
> > > > > > or
> > > > > > >> >> > > reference
> > > > > > >> >> > > > to
> > > > > > >> >> > > > > > > prove
> > > > > > >> >> > > > > > > > > it?
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > "KDC *may* collapse under some
> > > > circumstances"
> > > > > > is
> > > > > > >> the
> > > > > > >> >> > > proper
> > > > > > >> >> > > > > > > > wording.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > We have several customers who are
> > > executing
> > > > > > >> >> workloads
> > > > > > >> >> > on
> > > > > > >> >> > > > > > > > Spark/Flink.
> > > > > > >> >> > > > > > > > > > Most
> > > > > > >> >> > > > > > > > > > > > of the time I'm facing their
> > > > > > >> >> > > > > > > > > > > > daily issues which is heavily
> > environment
> > > > and
> > > > > > >> >> use-case
> > > > > > >> >> > > > > > dependent.
> > > > > > >> >> > > > > > > > > I've
> > > > > > >> >> > > > > > > > > > > > seen various cases:
> > > > > > >> >> > > > > > > > > > > > * where the mentioned ~1k nodes were
> > > > working
> > > > > > fine
> > > > > > >> >> > > > > > > > > > > > * where KDC thought the number of
> > > requests
> > > > > are
> > > > > > >> >> coming
> > > > > > >> >> > > from
> > > > > > >> >> > > > > DDOS
> > > > > > >> >> > > > > > > > > attack
> > > > > > >> >> > > > > > > > > > so
> > > > > > >> >> > > > > > > > > > > > discontinued authentication
> > > > > > >> >> > > > > > > > > > > > * where KDC was simply not responding
> > > > because
> > > > > > of
> > > > > > >> the
> > > > > > >> >> > load
> > > > > > >> >> > > > > > > > > > > > * where KDC was intermittently had
> some
> > > > > outage
> > > > > > >> (this
> > > > > > >> >> > was
> > > > > > >> >> > > > the
> > > > > > >> >> > > > > > most
> > > > > > >> >> > > > > > > > > nasty
> > > > > > >> >> > > > > > > > > > > > thing)
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > Since you're managing relatively big
> > > > cluster
> > > > > > then
> > > > > > >> >> you
> > > > > > >> >> > > know
> > > > > > >> >> > > > > that
> > > > > > >> >> > > > > > > KDC
> > > > > > >> >> > > > > > > > > is
> > > > > > >> >> > > > > > > > > > not
> > > > > > >> >> > > > > > > > > > > > only used by Spark/Flink workloads
> > > > > > >> >> > > > > > > > > > > > but the whole company IT
> infrastructure
> > > is
> > > > > > >> bombing
> > > > > > >> >> it
> > > > > > >> >> > so
> > > > > > >> >> > > it
> > > > > > >> >> > > > > > > really
> > > > > > >> >> > > > > > > > > > depends
> > > > > > >> >> > > > > > > > > > > > on other factors too whether KDC is
> > > > reaching
> > > > > > >> >> > > > > > > > > > > > it's limit or not. Not sure what kind
> > of
> > > > > > evidence
> > > > > > >> >> are
> > > > > > >> >> > you
> > > > > > >> >> > > > > > looking
> > > > > > >> >> > > > > > > > for
> > > > > > >> >> > > > > > > > > > but
> > > > > > >> >> > > > > > > > > > > > I'm not authorized to share any
> > > information
> > > > > > about
> > > > > > >> >> > > > > > > > > > > > our clients data.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > One thing is for sure. The more
> > external
> > > > > system
> > > > > > >> >> types
> > > > > > >> >> > are
> > > > > > >> >> > > > > used
> > > > > > >> >> > > > > > in
> > > > > > >> >> > > > > > > > > > > > workloads (for ex. HDFS, HBase, Hive,
> > > > Kafka)
> > > > > > >> which
> > > > > > >> >> > > > > > > > > > > > are authenticating through KDC the
> more
> > > > > > >> possibility
> > > > > > >> >> to
> > > > > > >> >> > > > reach
> > > > > > >> >> > > > > > this
> > > > > > >> >> > > > > > > > > > > > threshold when the cluster is big
> > enough.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > All in all this feature is here to
> help
> > > all
> > > > > > users
> > > > > > >> >> never
> > > > > > >> >> > > > reach
> > > > > > >> >> > > > > > > this
> > > > > > >> >> > > > > > > > > > > > limitation.
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > BR,
> > > > > > >> >> > > > > > > > > > > > G
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > On Thu, Jan 13, 2022 at 1:00 PM 张俊帆 <
> > > > > > >> >> > > > zuston.sha...@gmail.com
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > > > > > wrote:
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > Hi G
> > > > > > >> >> > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > Thanks for your quick reply. I
> think
> > > > > > reserving
> > > > > > >> the
> > > > > > >> >> > > config
> > > > > > >> >> > > > > of
> > > > > > >> >> > > > > > > > > > > > >
> > > > *security.kerberos.fetch.delegation-token*
> > > > > > >> >> > > > > > > > > > > > > and simplifying disable the token
> > > > fetching
> > > > > > is a
> > > > > > >> >> good
> > > > > > >> >> > > > > idea.By
> > > > > > >> >> > > > > > > the
> > > > > > >> >> > > > > > > > > way,
> > > > > > >> >> > > > > > > > > > > > > maybe this should be added
> > > > > > >> >> > > > > > > > > > > > > in the migration plan or
> intergation
> > > > > section
> > > > > > in
> > > > > > >> >> the
> > > > > > >> >> > > > > FLIP-211.
> > > > > > >> >> > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > Besides, I have a question that the
> > KDC
> > > > > will
> > > > > > >> >> collapse
> > > > > > >> >> > > > when
> > > > > > >> >> > > > > > the
> > > > > > >> >> > > > > > > > > > cluster
> > > > > > >> >> > > > > > > > > > > > > reached 200 nodes you described
> > > > > > >> >> > > > > > > > > > > > > in the google doc. Do you have any
> > > > > attachment
> > > > > > >> or
> > > > > > >> >> > > > reference
> > > > > > >> >> > > > > to
> > > > > > >> >> > > > > > > > prove
> > > > > > >> >> > > > > > > > > > it?
> > > > > > >> >> > > > > > > > > > > > > Because in our internal
> per-cluster,
> > > > > > >> >> > > > > > > > > > > > > the nodes reaches > 1000 and KDC
> > looks
> > > > > good.
> > > > > > >> Do i
> > > > > > >> >> > > missed
> > > > > > >> >> > > > or
> > > > > > >> >> > > > > > > > > > misunderstood
> > > > > > >> >> > > > > > > > > > > > > something? Please correct me.
> > > > > > >> >> > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > Best
> > > > > > >> >> > > > > > > > > > > > > JunFan.
> > > > > > >> >> > > > > > > > > > > > > On Jan 13, 2022, 5:26 PM +0800,
> > > > > > >> >> dev@flink.apache.org
> > > > > > >> >> > ,
> > > > > > >> >> > > > > wrote:
> > > > > > >> >> > > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > >
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > >
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ
> > > > > > >> >> > > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > > > >
> > > > > > >> >> > > > > > > > > >
> > > > > > >> >> > > > > > > > >
> > > > > > >> >> > > > > > > >
> > > > > > >> >> > > > > > >
> > > > > > >> >> > > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to