[
https://issues.apache.org/jira/browse/FLINK-30354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Pohl updated FLINK-30354:
----------------------------------
Description:
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the
complexity of the {{LookupFullCache}} implementation and shrinking the amount
of threadpools being used from 3 to 2. Here's the proposal I also shared in the
[FLINK-29405 PR
comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:
About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}}
and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but
also some utility methods for providing processing or event time (where it's
not clear to me why this is connected with the reload. It looks like a future
task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible
concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.
About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which
triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes
longer, subsequently triggered calls pile up. Here, I'm wondering whether
that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in
{{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading
the data. It triggers {{CacheLoader#updateCache}} with
{{CacheLoader#reloadLock}} being acquired.
{{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data
is loaded concurrently if possible using a {{FixedThreadPool}}.
My proposal is now to reduce the number of used thread pools: Instead of having
a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}}
implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where
we specify the minimum number of threads being 1 and the maximum being the
number of cores (similar to what is already there with
[ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
That would free the {{CacheLoader}} from starting and shutting down thread
pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}}
calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the
{{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}}
implementations could move into {{LookupFullCache}} as well calling it
something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in
charge of managing all cache loading-related threads. Additionally, it would
manage the current execution through {{CompletableFutures}} (one for triggering
the reload and one for executing the reload. Triggering a reload would require
cancelling the current future (if it's not completed, yet) or ignoring the
trigger if we want a reload to finish before triggering a new one.
{{CacheLoader#updateCache}} would become
{{CacheLoader#updateCacheAsync(ExecutorService)}} returning a
{{CompletableFuture}} that completes as soon as all subtasks are completed.
{{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of
creating its own future. The lifecycle (as already explained in the previous
paragraph) would be managed by {{LookupFullCache}}. The benefit would be that
we wouldn't have to deal interrupts in {{CacheLoader}}.
I see the following benefits:
* {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event
time and processing time functions are for, though).
* {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the
completion of the cache loading in {{LookupFullCache}} through the
{{CompletableFuture}} instances.
* {{CacheReloadTrigger}} can focus on the strategy implementation without
worrying about instantiating threads. This is duplicated code right now in
{{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
was:
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the
complexity of the {{LookupFullCache}} implementation and shrinking the amount
of threadpools being used from 3 to 2. Here's the proposal I also shared in the
[FLINK-29405 PR
comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:
About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}}
and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but
also some utility methods for providing processing or event time (where it's
not clear to me why this is connected with the reload. It looks like a future
task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible
concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.
About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which
triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes
longer, subsequently triggered calls pile up. Here, I'm wondering whether
that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in
{{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading
the data. It triggers {{CacheLoader#updateCache}} with
{{CacheLoader#reloadLock}} being acquired.
{{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data
is loaded concurrently if possible using a {{FixedThreadPool}}.
My proposal is now to reduce the number of used thread pools: Instead of having
a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}}
implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where
we specify the minimum number of threads being 1 and the maximum being the
number of cores (similar to what is already there with
[ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
That would free the {{CacheLoader}} from starting and shutting down thread
pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}}
calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the
{{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}}
implementations could move into {{LookupFullCache}} as well calling it
something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in
charge of managing all cache loading-related threads. Additionally, it would
manage the current execution through {{CompletableFutures}} (one for triggering
the reload and one for executing the reload. Triggering a reload would require
cancelling the current future (if it's not completed, yet) or ignoring the
trigger if we want a reload to finish before triggering a new one.
{{CacheLoader#updateCache}} would become
{{CacheLoader#updateCacheAsync(ExecutorService)}} returning a
{{CompletableFuture}} that completes as soon as all subtasks are completed.
{{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of
creating its own future. The lifecycle (as already explained in the previous
paragraph) would be managed by {{LookupFullCache}}. The benefit would be that
we wouldn't have to deal interrupts in {{CacheLoader}}.
I see the following benefits:
{{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event
time and processing time functions are for, though).
{{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the
completion of the cache loading in {{LookupFullCache}} through the
{{CompletableFuture}} instances.
{{CacheReloadTrigger}} can focus on the strategy implementation without
worrying about instantiating threads. This is duplicated code right now in
{{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
I might miss something here. I'm curious what you think. I probably got carried
away a bit by your proposal introducing async calls. innocent I totally
understand if you argue that it's way too much out-of-scope for this issue and
we actually want to focus on fixing the test instability. In that case, I would
do another round of review on your current proposal. But I'm happy to help you
if you think that my proposal is reasonable. Or we create a follow-up Jira
issue to tackle that.
> Reducing the number of ThreadPools in LookupFullCache and related
> cache-loading classes
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-30354
> URL: https://issues.apache.org/jira/browse/FLINK-30354
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Affects Versions: 1.17.0
> Reporter: Matthias Pohl
> Priority: Major
>
> In the course of reviewing FLINK-29405, I came up with a proposal to reduce
> the complexity of the {{LookupFullCache}} implementation and shrinking the
> amount of threadpools being used from 3 to 2. Here's the proposal I also
> shared in the [FLINK-29405 PR
> comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:
> About the responsibilities how I see them:
> * {{LookupFullCache}} is the composite class for combining the
> {{CacheLoader}} and the {{CacheReloadTrigger}} through the
> {{ReloadTriggerContext}}
> * {{ReloadTriggerContext}} provides an async call to trigger the reload but
> also some utility methods for providing processing or event time (where it's
> not clear to me why this is connected with the reload. It looks like a future
> task based on the TODO comments)
> * {{CacheLoader}} is in charge of loading the data into memory (if possible
> concurrently).
> {{CacheReloadTrigger}} provides different strategies to trigger new reloads.
> About the different executors:
> * The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}}
> which triggers {{ReloadTriggerContext::reload}} subsequently. If the loading
> takes longer, subsequently triggered calls pile up. Here, I'm wondering
> whether that's what we want. thinking
> * {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in
> {{CacheLoader#reloadExecutor}} which is kind of the "main" thread for
> reloading the data. It triggers {{CacheLoader#updateCache}} with
> {{CacheLoader#reloadLock}} being acquired.
> {{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The
> data is loaded concurrently if possible using a {{FixedThreadPool}}.
> My proposal is now to reduce the number of used thread pools: Instead of
> having a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the
> {{CacheLoader}} implementation, couldn't we come up with a custom
> {{ThreadPoolExecutor}} where we specify the minimum number of threads being 1
> and the maximum being the number of cores (similar to what is already there
> with
> [ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]).
> That would free the {{CacheLoader}} from starting and shutting down thread
> pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}}
> calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the
> {{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}}
> implementations could move into {{LookupFullCache}} as well calling it
> something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in
> charge of managing all cache loading-related threads. Additionally, it would
> manage the current execution through {{CompletableFutures}} (one for
> triggering the reload and one for executing the reload. Triggering a reload
> would require cancelling the current future (if it's not completed, yet) or
> ignoring the trigger if we want a reload to finish before triggering a new
> one.
> {{CacheLoader#updateCache}} would become
> {{CacheLoader#updateCacheAsync(ExecutorService)}} returning a
> {{CompletableFuture}} that completes as soon as all subtasks are completed.
> {{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead
> of creating its own future. The lifecycle (as already explained in the
> previous paragraph) would be managed by {{LookupFullCache}}. The benefit
> would be that we wouldn't have to deal interrupts in {{CacheLoader}}.
> I see the following benefits:
> * {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the
> event time and processing time functions are for, though).
> * {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the
> completion of the cache loading in {{LookupFullCache}} through the
> {{CompletableFuture}} instances.
> * {{CacheReloadTrigger}} can focus on the strategy implementation without
> worrying about instantiating threads. This is duplicated code right now in
> {{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)