You should have permissions now. Note that I saw 2 accounts matching your name, and I picked gaborgsomogyi.

On 31/01/2022 11:28, Gabor Somogyi wrote:
Not sure if the mentioned write right already given or not but I still
don't see any edit button.

G


On Fri, Jan 28, 2022 at 5:08 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

Hi Robert,

That would be awesome.

My cwiki username: gaborgsomogyi

G


On Fri, Jan 28, 2022 at 5:06 PM Robert Metzger <metrob...@gmail.com>
wrote:

Hey Gabor,

let me know your cwiki username, and I can give you write permissions.


On Fri, Jan 28, 2022 at 4:05 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

Thanks for making the design better! No further thing to discuss from my
side.

Started to reflect the agreement in the FLIP doc.
Since I don't have access to the wiki I need to ask Marci to do that
which
may take some time.

G


On Fri, Jan 28, 2022 at 3:52 PM David Morávek <d...@apache.org> wrote:

Hi,

AFAIU an under registration TM is not added to the registered TMs map
until
RegistrationResponse ..

I think you're right, with a careful design around threading
(delegating
update broadcasts to the main thread) + synchronous initial update
(that
would be nice to avoid) this should be doable.

Not sure what you mean "we can't register the TM without providing it
with
token" but in unsecure configuration registration must happen w/o
tokens.
Exactly as you describe it, this was meant only for the "kerberized /
secured" cluster case, in other cases we wouldn't enforce a non-null
token
in the response

I think this is a good idea in general.
+1

If you don't have any more thoughts on the RPC / lifecycle part, can
you
please reflect it into the FLIP?

D.

On Fri, Jan 28, 2022 at 3:16 PM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

- Make sure DTs issued by single DTMs are monotonically increasing
(can
be
sorted on TM side)

AFAIU an under registration TM is not added to the registered TMs
map
until
RegistrationResponse
is processed which would contain the initial tokens. If that's true
then
how is it possible to have race with
DTM update which is working on the registered TMs list?
To be more specific "taskExecutors" is the registered map of TMs to
which
DTM can send updated tokens
but this doesn't contain the under registration TM while
RegistrationResponse is not processed, right?

Of course if DTM can update while RegistrationResponse is processed
then
somehow sorting would be
required and that case I would agree.

- Scope DT updates by the RM ID and ensure that TM only accepts
update
from
the current leader

I've planned this initially the mentioned way so agreed.

- Return initial token with the RegistrationResponse, which should
make
the
RPC contract bit clearer (ensure that we can't register the TM
without
providing it with token)

I think this is a good idea in general. Not sure what you mean "we
can't
register the TM without
providing it with token" but in unsecure configuration registration
must
happen w/o tokens.
All in all the newly added tokens field must be somehow optional.

G


On Fri, Jan 28, 2022 at 2:22 PM David Morávek <d...@apache.org>
wrote:
We had a long discussion with Chesnay about the possible edge
cases
and
it
basically boils down to the following two scenarios:

1) There is a possible race condition between TM registration (the
first
DT
update) and token refresh if they happen simultaneously. Than the
registration might beat the refreshed token. This could be easily
addressed
if DTs could be sorted (eg. by the expiration time) on the TM
side.
In
other words, if there are multiple updates at the same time we
need
to
make
sure that we have a deterministic way of choosing the latest one.

One idea by Chesnay that popped up during this discussion was
whether
we
could simply return the initial token with the
RegistrationResponse
to
avoid making an extra call during the TM registration.

2) When the RM leadership changes (eg. because zookeeper session
times
out)
there might be a race condition where the old RM is shutting down
and
updates the tokens, that it might again beat the registration
token
of
the
new RM. This could be avoided if we scope the token by
_ResourceManagerId_
and only accept updates for the current leader (basically we'd
have
an
extra parameter to the _updateDelegationToken_ method).

-

DTM is way simpler then for example slot management, which could
receive
updates from the JobMaster that RM might not know about.

So if you want to go in the path you're describing it should be
doable
and
we'd propose following to cover all cases:

- Make sure DTs issued by single DTMs are monotonically increasing
(can
be
sorted on TM side)
- Scope DT updates by the RM ID and ensure that TM only accepts
update
from
the current leader
- Return initial token with the RegistrationResponse, which should
make
the
RPC contract bit clearer (ensure that we can't register the TM
without
providing it with token)

Any thoughts?


On Fri, Jan 28, 2022 at 10:53 AM Gabor Somogyi <
gabor.g.somo...@gmail.com>
wrote:

Thanks for investing your time!

The first 2 bulletpoint are clear.
If there is a chance that a TM can go to an inconsistent state
then I
agree
with the 3rd bulletpoint.
Just before we agree on that I would like to learn something new
and
understand how is it possible that a TM
gets corrupted? (In Spark I've never seen such thing and no
mechanism
to
fix this but Flink is definitely not Spark)

Here is my understanding:
* DTM pushes new obtained DTs to TMs and if any exception occurs
then a
retry after "security.kerberos.tokens.retry-wait"
happens. This means DTM retries until it's not possible to send
new
DTs
to
all registered TMs.
* New TM registration must fail if "updateDelegationToken" fails
* "updateDelegationToken" fails consistently like a DB (at
least I
plan
to
implement it that way).
If DTs are arriving on the TM side then a single
"UserGroupInformation.getCurrentUser.addCredentials"
will be called which I've never seen it failed.
* I hope all other code parts are not touching existing DTs
within
the
JVM
I would like to emphasize I'm not against to add it just want to
see
what
kind of problems are we facing.
It would ease to catch bugs earlier and help in the maintenance.

All in all I would buy the idea to add the 3rd bullet if we
foresee
the
need.

G


On Fri, Jan 28, 2022 at 10:07 AM David Morávek <d...@apache.org
wrote:
Hi Gabor,

This is definitely headed in a right direction +1.

I think we still need to have a safeguard in case some of the
TMs
gets
into
the inconsistent state though, which will also eliminate the
need
for
implementing a custom retry mechanism (when
_updateDelegationToken_
call
fails for some reason).

We already have this safeguard in place for slot pool (in case
there
are
some slots in inconsistent state - eg. we haven't freed them
for
some
reason) and for the partition tracker, which could be simply
enhanced.
This
is done via periodic heartbeat from TaskManagers to the
ResourceManager
that contains report about state of these two components
(from TM
perspective) so the RM can reconcile their state if necessary.

I don't think adding an additional field to
_TaskExecutorHeartbeatPayload_
should be a concern as we only heartbeat every ~ 10s by
default
and
the
new
field would be small compared to rest of the existing payload.
Also
heartbeat doesn't need to contain the whole DT, but just some
identifier
which signals whether it uses the right one, that could be
significantly
smaller.

This is still a PUSH based approach as the RM would again call
the
newly
introduced _updateDelegationToken_ when it encounters
inconsistency
(eg.
due to a temporary network partition / a race condition we
didn't
test
for
/ some other scenario we didn't think about). In practice
these
inconsistencies are super hard to avoid and reason about (and
unfortunately
yes, we see them happen from time to time), so reusing the
existing
mechanism that is designed for this exact problem simplify
things.
To sum this up we'd have three code paths for calling
_updateDelegationToken_:
1) When the TM registers, we push the token (if DTM already
has
it)
to
it
2) When DTM obtains a new token it broadcasts it to all
currently
connected
TMs
3) When a TM gets out of sync, DTM would reconcile it's state

WDYT?

Best,
D.


On Wed, Jan 26, 2022 at 9:03 PM David Morávek <
d...@apache.org>
wrote:
Thanks the update, I'll go over it tomorrow.

On Wed, Jan 26, 2022 at 5:33 PM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

Hi All,

Since it has turned out that DTM can't be added as member
of
JobMaster
<

https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
I've
came up with a better proposal.
David, thanks for pinpointing this out, you've caught a
bug in
the
early
phase!

Namely ResourceManager
<

https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
is
a single instance class where DTM can be added as member
variable.
It has a list of all already registered TMs and new TM
registration
is
also
happening here.
The following can be added from logic perspective to be
more
specific:
* Create new DTM instance in ResourceManager
<

https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L124
and
start it (re-occurring thread to obtain new tokens)
* Add a new function named "updateDelegationTokens" to
TaskExecutorGateway
<

https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java#L54
* Call "updateDelegationTokens" on all registered TMs to
propagate
new
DTs
* In case of new TM registration call
"updateDelegationTokens"
before
registration succeeds to setup new TM properly

This way:
* only a single DTM would live within a cluster which is
the
expected
behavior
* DTM is going to be added to a central place where all
deployment
target
can make use of it
* DTs are going to be pushed to TMs which would generate
less
network
traffic than pull based approach
(please see my previous mail where I've described both
approaches)
* HA scenario is going to be consistent because such
<

https://github.com/apache/flink/blob/674bc96662285b25e395fd3dddf9291a602fc183/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java#L1069
a solution can be added to "updateDelegationTokens"

@David or all others plz share whether you agree on this or
you
have
better
idea/suggestion.

BR,
G


On Tue, Jan 25, 2022 at 11:00 AM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

First of all thanks for investing your time and helping
me
out.
As I
see
you have pretty solid knowledge in the RPC area.
I would like to rely on your knowledge since I'm learning
this
part.
- Do we need to introduce a new RPC method or can we
for
example
piggyback
on heartbeats?

I'm fine with either solution but one thing is important
conceptually.
There are fundamentally 2 ways how tokens can be updated:
- Push way: When there are new DTs then JM JVM pushes
DTs to
TM
JVMs.
This
is the preferred one since tiny amount of control logic
needed.
- Pull way: Each time a TM would like to poll JM whether
there
are
new
tokens and each TM wants to decide alone whether DTs
needs
to
be
updated or
not.
As you've mentioned here some ID needs to be generated,
it
would
generated
quite some additional network traffic which can be
definitely
avoided.
As a final thought in Spark we've had this way of DT
propagation
logic
and
we've had major issues with it.

So all in all DTM needs to obtain new tokens and there
must
a
way
to
send
this data to all TMs from JM.

- What delivery semantics are we looking for? (what if
we're
only
able to
update subset of TMs / what happens if we exhaust
retries /
should
we
even
have the retry mechanism whatsoever) - I have a feeling
that
somehow
leveraging the existing heartbeat mechanism could help to
answer
these
questions

Let's go through these questions one by one.
What delivery semantics are we looking for?
DTM must receive an exception when at least one TM was
not
able
to
get
DTs.
what if we're only able to update subset of TMs?
Such case DTM will reschedule token obtain after
"security.kerberos.tokens.retry-wait" time.

what happens if we exhaust retries?
There is no number of retries. In default configuration
tokens
needs
to
be
re-obtained after one day.
DTM tries to obtain new tokens after 1day * 0.75
(security.kerberos.tokens.renewal-ratio) = 18 hours.
When fails it retries after
"security.kerberos.tokens.retry-wait"
which
is
1 hour by default.
If it never succeeds then authentication error is going
to
happen
on
the
TM side and the workload is
going to stop.

should we even have the retry mechanism whatsoever?
Yes, because there are always temporary cluster issues.

What does it mean for the running application (how does
this
look
like
from
the user perspective)? As far as I remember the logs are
only
collected
("aggregated") after the container is stopped, is that
correct?
With default config it works like that but it can be
forced
to
aggregate
at specific intervals.
A useful feature is forcing YARN to aggregate logs while
the
job
is
still
running.
For long-running jobs such as streaming jobs, this is
invaluable.
To
do
this,

yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds
must
be
set to a non-negative value.
When this is set, a timer will be set for the given
duration,
and
whenever
that timer goes off,
log aggregation will run on new files.

I think
this topic should get its own section in the FLIP (having
some
cross
reference to YARN ticket would be really useful, but I'm
not
sure
if
there
are any).

I think this is important knowledge but this FLIP is not
touching
the
already existing behavior.
DTs are set on the AM container which is renewed by YARN
until
it's
not
possible anymore.
Any kind of new code is not going to change this
limitation.
BTW,
there
is
no jira for this.
If you think it worth to write this down then I think the
good
place
is
the official security doc
area as caveat.

If we split the FLIP into two parts / sections that
I've
suggested,
I
don't
really think that you need to explicitly test for each
deployment
scenario
/ cluster framework, because the DTM part is completely
independent
of
the
deployment target. Basically this is what I'm aiming for
with
"making
it
work with the standalone" (as simple as starting a new
java
process)
Flink
first (which is also how most people deploy streaming
application
on
k8s
and the direction we're pushing forward with the
auto-scaling
/
reactive
mode initiatives).

I see your point and agree the main direction. k8s is the
megatrend
which
most of the peoples
will use sooner or later. Not 100% sure what kind of
split
you
suggest
but
in my view
the main target is to add this feature and I'm open to
any
logical
work
ordering.
Please share the specific details and we work it out...

G


On Mon, Jan 24, 2022 at 3:04 PM David Morávek <
d...@apache.org>
wrote:
Could you point to a code where you think it could be
added
exactly?
A
helping hand is welcome here 🙂

I think you can take a look at
_ResourceManagerPartitionTracker_
[1]
which
seems to have somewhat similar properties to the DTM.

One topic that needs to be addressed there is how the
RPC
with
the
_TaskExecutorGateway_ should look like.
- Do we need to introduce a new RPC method or can we for
example
piggyback
on heartbeats?
- What delivery semantics are we looking for? (what if
we're
only
able
to
update subset of TMs / what happens if we exhaust
retries /
should
we
even
have the retry mechanism whatsoever) - I have a feeling
that
somehow
leveraging the existing heartbeat mechanism could help
to
answer
these
questions

In short, after DT reaches it's max lifetime then log
aggregation
stops
What does it mean for the running application (how does
this
look
like
from
the user perspective)? As far as I remember the logs are
only
collected
("aggregated") after the container is stopped, is that
correct? I
think
this topic should get its own section in the FLIP
(having
some
cross
reference to YARN ticket would be really useful, but I'm
not
sure
if
there
are any).

All deployment modes (per-job, per-app, ...) are
planned to
be
tested
and
expect to work with the initial implementation however
not
all
deployment
targets (k8s, local, ...

If we split the FLIP into two parts / sections that I've
suggested, I
don't
really think that you need to explicitly test for each
deployment
scenario
/ cluster framework, because the DTM part is completely
independent
of
the
deployment target. Basically this is what I'm aiming for
with
"making
it
work with the standalone" (as simple as starting a new
java
process)
Flink
first (which is also how most people deploy streaming
application
on
k8s
and the direction we're pushing forward with the
auto-scaling /
reactive
mode initiatives).

The whole integration with YARN (let's forget about log
aggregation
for a
moment) / k8s-native only boils down to how do we make
the
keytab
file
local to the JobManager so the DTM can read it, so it's
basically
built on
top of that. The only special thing that needs to be
tested
there
is
the
"keytab distribution" code path.

[1]


https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
Best,
D.

On Mon, Jan 24, 2022 at 12:35 PM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

There is a separate JobMaster for each job
within a Flink cluster and each JobMaster only has a
partial
view
of
the
task managers

Good point! I've had a deeper look and you're right.
We
definitely
need
to
find another place.

Related per-cluster or per-job keytab:
In the current code per-cluster keytab is implemented
and
I'm
intended
to
keep it like this within this FLIP. The reason is
simple:
tokens
on
TM
side
can be stored within the UserGroupInformation (UGI)
structure
which
is
global. I'm not telling it's impossible to change that
but
I
think
that
this is such a complexity which the initial
implementation
is
not
required
to contain. Additionally we've not seen such need from
user
side.
If
the
need may rise later on then another FLIP with this
topic
can
be
created
and
discussed. Proper multi-UGI handling within a single
JVM
is a
topic
where
several round of deep-dive with the Hadoop/YARN guys
are
required.
single DTM instance embedded with
the ResourceManager (the Flink component)

Could you point to a code where you think it could be
added
exactly?
A
helping hand is welcome here🙂

Then the single (initial) implementation should work
with
all
the
deployments modes out of the box (which is not what
the
FLIP
suggests).
Is
that correct?

All deployment modes (per-job, per-app, ...) are
planned
to
be
tested
and
expect to work with the initial implementation however
not
all
deployment
targets (k8s, local, ...) are not intended to be
tested.
Per
deployment
target new jira needs to be created where I expect
small
number
of
codes
needs to be added and relatively expensive testing
effort
is
required.
I've taken a look into the prototype and in the
"YarnClusterDescriptor"
you're injecting a delegation token into the AM [1]
(that's
obtained
using
the provided keytab). If I understand this correctly
from
previous
discussion / FLIP, this is to support log aggregation
and
DT
has
a
limited
validity. How is this DT going to be renewed?

You're clever and touched a limitation which Spark has
too.
In
short,
after
DT reaches it's max lifetime then log aggregation
stops.
I've
had
several
deep-dive rounds with the YARN guys at Spark years
because
wanted
to
fill
this gap. They can't provide us any way to re-inject
the
newly
obtained
DT
so at the end I gave up this.

BR,
G


On Mon, 24 Jan 2022, 11:00 David Morávek, <
d...@apache.org
wrote:
Hi Gabor,

There is actually a huge difference between
JobManager
(process)
and
JobMaster (job coordinator). The naming is
unfortunately
bit
misleading
here from historical reasons. There is a separate
JobMaster
for
each
job
within a Flink cluster and each JobMaster only has a
partial
view
of
the
task managers (depends on where the slots for a
particular
job
are
allocated). This means that you'll end up with N
"DelegationTokenManagers"
competing with each other (N = number of running
jobs
in
the
cluster).
This makes me think we're mixing two abstraction
levels
here:
a) Per-cluster delegation tokens
- Simpler approach, it would involve a single DTM
instance
embedded
with
the ResourceManager (the Flink component)
b) Per-job delegation tokens
- More complex approach, but could be more flexible
from
the
user
side of
things.
- Multiple DTM instances, that are bound with the
JobMaster
lifecycle.
Delegation tokens are attached with a particular
slots
that
are
executing
the job tasks instead of the whole task manager (TM
could
be
executing
multiple jobs with different tokens).
- The question is which keytab should be used for
the
clustering
framework,
to support log aggregation on YARN (an extra keytab,
keytab
that
comes
with
the first job?)

I think these are the things that need to be
clarified
in
the
FLIP
before
proceeding.

A follow-up question for getting a better
understanding
where
this
should
be headed: Are there any use cases where user may
want
to
use
different
keytabs with each job, or are we fine with using a
cluster-wide
keytab?
If
we go with per-cluster keytabs, is it OK that all
jobs
submitted
into
this
cluster can access it (even the future ones)? Should
this
be
a
security
concern?

Presume you though I would implement a new class
with
JobManager
name.
The
plan is not that.

I've never suggested such thing.


No. That said earlier DT handling is planned to be
done
completely
in
Flink. DTM has a renewal thread which re-obtains
tokens
in
the
proper
time
when needed.

Then the single (initial) implementation should work
with
all
the
deployments modes out of the box (which is not what
the
FLIP
suggests).
Is
that correct?

If the cluster framework, also requires delegation
token
for
their
inner
working (this is IMO only applies to YARN), it might
need
an
extra
step
(injecting the token into application master
container).
Separating the individual layers (actual Flink
cluster
-
basically
making
this work with a standalone deployment  / "cluster
framework" -
support
for
YARN log aggregation) in the FLIP would be useful.

Reading the linked Spark readme could be useful.
I've read that, but please be patient with the
questions,
Kerberos
is
not
an easy topic to get into and I've had a very little
contact
with
it
in
the
past.



https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
I've taken a look into the prototype and in the
"YarnClusterDescriptor"
you're injecting a delegation token into the AM [1]
(that's
obtained
using
the provided keytab). If I understand this correctly
from
previous
discussion / FLIP, this is to support log
aggregation
and
DT
has
a
limited
validity. How is this DT going to be renewed?

[1]


https://github.com/gaborgsomogyi/flink/commit/8ab75e46013f159778ccfce52463e7bc63e395a9#diff-02416e2d6ca99e1456f9c3949f3d7c2ac523d3fe25378620c09632e4aac34e4eR1261
Best,
D.

On Fri, Jan 21, 2022 at 9:35 PM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

Here is the exact class, I'm from mobile so not
had a
look
at
the
exact
class name:


https://github.com/gaborgsomogyi/flink/blob/8ab75e46013f159778ccfce52463e7bc63e395a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java#L176
That keeps track of TMs where the tokens can be
sent
to.
My feeling would be that we shouldn't really
introduce
a
new
component
with
a custom lifecycle, but rather we should try to
incorporate
this
into
existing ones.

Can you be more specific? Presume you though I
would
implement
a
new
class
with JobManager name. The plan is not that.

If I understand this correctly, this means that
we
then
push
the
token
renewal logic to YARN.

No. That said earlier DT handling is planned to be
done
completely
in
Flink. DTM has a renewal thread which re-obtains
tokens
in
the
proper
time
when needed. YARN log aggregation is a totally
different
feature,
where
YARN does the renewal. Log aggregation was an
example
why
the
code
can't
be
100% reusable for all resource managers. Reading
the
linked
Spark
readme
could be useful.

G

On Fri, 21 Jan 2022, 21:05 David Morávek, <
d...@apache.org
wrote:
JobManager is the Flink class.

There is no such class in Flink. The closest
thing
to
the
JobManager
is a
ClusterEntrypoint. The cluster entrypoint spawns
new
RM
Runner
&
Dispatcher
Runner that start participating in the leader
election.
Once
they
gain
leadership they spawn the actual underlying
instances
of
these
two
"main
components".

My feeling would be that we shouldn't really
introduce
a
new
component
with
a custom lifecycle, but rather we should try to
incorporate
this
into
existing ones.

My biggest concerns would be:

- How would the lifecycle of the new component
look
like
with
regards
to
HA
setups. If we really try to decide to introduce
a
completely
new
component,
how should this work in case of multiple
JobManager
instances?
- Which components does it talk to / how? For
example
how
does
the
broadcast of new token to task managers
(TaskManagerGateway)
look
like?
Do
we simply introduce a new RPC on the
ResourceManagerGateway
that
broadcasts
it or does the new component need to do some
kind
of
bookkeeping
of
task
managers that it needs to notify?

YARN based HDFS log aggregation would not work
by
dropping
that
code.
Just
to be crystal clear, the actual implementation
contains
this
fir
exactly
this reason.

This is the missing part +1. If I understand
this
correctly,
this
means
that we then push the token renewal logic to
YARN.
How
do
you
plan to
implement the renewal logic on k8s?

D.

On Fri, Jan 21, 2022 at 8:37 PM Gabor Somogyi <
gabor.g.somo...@gmail.com
wrote:

I think we might both mean something
different
by
the
RM.
You feel it well, I've not specified these
terms
well
in
the
explanation.
RM I meant resource management framework.
JobManager
is
the
Flink
class.
This means that inside JM instance there will
be
a
DTM
instance, so
they
would have the same lifecycle. Hope I've
answered
the
question.
If we have tokens available on the client
side,
why
do
we
need to
set
them
into the AM (yarn specific concept) launch
context?
YARN based HDFS log aggregation would not
work by
dropping
that
code.
Just
to be crystal clear, the actual implementation
contains
this
fir
exactly
this reason.

G

On Fri, 21 Jan 2022, 20:12 David Morávek, <
d...@apache.org
wrote:
Hi Gabor,

1. One thing is important, token management
is
planned
to
be
done
generically within Flink and not
scattered in
RM
specific
code.
JobManager
has a DelegationTokenManager which obtains
tokens
time-to-time
(if
configured properly). JM knows which
TaskManagers
are
in
place
so
it
can
distribute it to all TMs. That's it
basically.

I think we might both mean something
different
by
the
RM.
JobManager
is
basically just a process encapsulating
multiple
components,
one
of
which
is
a ResourceManager, which is the component
that
manages
task
manager
registrations [1]. There is more or less a
single
implementation
of
the
RM
with plugable drivers for the active
integrations
(yarn,
k8s).
It would be great if you could share more
details
of
how
exactly
the
DTM
is
going to fit in the current JM architecture.

2. 99.9% of the code is generic but each RM
handles
tokens
differently. A
good example is YARN obtains tokens on
client
side
and
then
sets
them
on
the newly created AM container launch
context.
This
is
purely
YARN
specific
and cant't be spared. With my actual plans
standalone
can be
changed
to
use
the framework. By using it I mean no RM
specific
DTM
or
whatsoever
is
needed.

If we have tokens available on the client
side,
why
do
we
need to
set
them
into the AM (yarn specific concept) launch
context?
Why
can't
we
simply
send them to the JM, eg. as a parameter of
the
job
submission
/
via
separate RPC call? There might be something
I'm
missing
due to
limited
knowledge, but handling the token on the
"cluster
framework"
level
doesn't
seem necessary.

[1]


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#jobmanager
Best,
D.

On Fri, Jan 21, 2022 at 7:48 PM Gabor
Somogyi <
gabor.g.somo...@gmail.com
wrote:

Oh and one more thing. I'm planning to add
this
feature
in
small
chunk
of
PRs because security is super hairy area.
That
way
reviewers
can
be
more
easily obtains the concept.

On Fri, 21 Jan 2022, 18:03 David Morávek,
<
d...@apache.org>
wrote:
Hi Gabor,

thanks for drafting the FLIP, I think
having
a
solid
Kerberos
support
is
crucial for many enterprise deployments.

I have multiple questions regarding the
implementation
(note
that I
have
very limited knowledge of Kerberos):

1) If I understand it correctly, we'll
only
obtain
tokens
in
the
job
manager and then we'll distribute them
via
RPC
(needs
to
be
secured).
Can you please outline how the
communication
will
look
like?
Is
the
DelegationTokenManager going to be a
part
of
the
ResourceManager?
Can
you
outline it's lifecycle / how it's going
to
be
integrated
there?
2) Do we really need a YARN / k8s
specific
implementations?
Is
it
possible
to obtain / renew a token in a generic
way?
Maybe
to
rephrase
that,
is
it
possible to implement
DelegationTokenManager
for
the
standalone
Flink?
If
we're able to solve this point, it
could be
possible
to
target
all
deployment scenarios with a single
implementation.
Best,
D.

On Fri, Jan 14, 2022 at 3:47 AM Junfan
Zhang
<
zuston.sha...@gmail.com>
wrote:

Hi G

Thanks for your explain in detail. I
have
gotten
your
thoughts,
and
any
way this proposal
is a great improvement.

Looking forward to your implementation
and
i
will
keep
focus
on
it.
Thanks again.

Best
JunFan.
On Jan 13, 2022, 9:20 PM +0800, Gabor
Somogyi <
gabor.g.somo...@gmail.com
,
wrote:
Just to confirm keeping
"security.kerberos.fetch.delegation-token"
is
added
to the doc.

BR,
G


On Thu, Jan 13, 2022 at 1:34 PM
Gabor
Somogyi <
gabor.g.somo...@gmail.com
wrote:

Hi JunFan,

By the way, maybe this should be
added
in
the
migration
plan
or
intergation section in the
FLIP-211.
Going to add this soon.

Besides, I have a question that
the
KDC
will
collapse
when
the
cluster
reached 200 nodes you described
in the google doc. Do you have any
attachment
or
reference
to
prove
it?
"KDC *may* collapse under some
circumstances"
is
the
proper
wording.
We have several customers who are
executing
workloads
on
Spark/Flink.
Most
of the time I'm facing their
daily issues which is heavily
environment
and
use-case
dependent.
I've
seen various cases:
* where the mentioned ~1k nodes
were
working
fine
* where KDC thought the number of
requests
are
coming
from
DDOS
attack
so
discontinued authentication
* where KDC was simply not
responding
because
of
the
load
* where KDC was intermittently had
some
outage
(this
was
the
most
nasty
thing)

Since you're managing relatively
big
cluster
then
you
know
that
KDC
is
not
only used by Spark/Flink workloads
but the whole company IT
infrastructure
is
bombing
it
so
it
really
depends
on other factors too whether KDC
is
reaching
it's limit or not. Not sure what
kind
of
evidence
are
you
looking
for
but
I'm not authorized to share any
information
about
our clients data.

One thing is for sure. The more
external
system
types
are
used
in
workloads (for ex. HDFS, HBase,
Hive,
Kafka)
which
are authenticating through KDC the
more
possibility
to
reach
this
threshold when the cluster is big
enough.
All in all this feature is here to
help
all
users
never
reach
this
limitation.

BR,
G


On Thu, Jan 13, 2022 at 1:00 PM
张俊帆 <
zuston.sha...@gmail.com
wrote:
Hi G

Thanks for your quick reply. I
think
reserving
the
config
of
*security.kerberos.fetch.delegation-token*
and simplifying disable the
token
fetching
is a
good
idea.By
the
way,
maybe this should be added
in the migration plan or
intergation
section
in
the
FLIP-211.
Besides, I have a question that
the
KDC
will
collapse
when
the
cluster
reached 200 nodes you described
in the google doc. Do you have
any
attachment
or
reference
to
prove
it?
Because in our internal
per-cluster,
the nodes reaches > 1000 and KDC
looks
good.
Do i
missed
or
misunderstood
something? Please correct me.

Best
JunFan.
On Jan 13, 2022, 5:26 PM +0800,
dev@flink.apache.org
,
wrote:

https://docs.google.com/document/d/1JzMbQ1pCJsLVz8yHrCxroYMRP2GwGwvacLrGyaIx5Yc/edit?fbclid=IwAR0vfeJvAbEUSzHQAAJfnWTaX46L6o7LyXhMfBUCcPrNi-uXNgoOaI8PMDQ


Reply via email to