Re: unexpected high mem. usage / potential config misinterpretation

2024-09-18 Thread Alexis Sarda-Espinosa
Hi Simon,

I hope someone corrects me if I'm wrong, but just based on "batch mode
processing terabytes of data", I feel batch mode may be the issue. I am
under the impression that batch mode forces everything emitted by the
sources to RAM before any downstream operators do anything, so even if each
parallel task of your source runs in a different task manager and loads a
subset of your data, they might each end up trying to load terabytes into
RAM.

Hopefully someone more knowledgeable about batch mode can comment.

Regards,
Alexis.


On Wed, 18 Sept 2024, 18:04 Simon Frei,  wrote:

> Hi,
>
>
>
> tl;dr:
>
> Flink batch streaming API job resident memory usage grows far beyond
> expectations, resulting in system OOM kill/JVM native memory allocation
> failure - would appreciate a look over our config/assumptions to
> potentially spot any obvious mistakes.
>
>
>
> Longer form:
>
>
>
> My colleague and I are troubleshooting a large batch job for a long time,
> and still experience behaviour around flinks memory usage we cannot
> explain. My hope is that by explaining our configuration and observations,
> someone can spot a misconception. And in the ideal case I can then send a
> PR for the documentation to hopefully make that misconception less likely
> for other users.
>
>
>
> I'll start with an overview/"story-like" form, and then below that are
> some numbers/configs.
>
>
>
> This is a streaming job run in batch mode, processing terabytes of data
> sourcing and sinking to compressed files in S3. In between there are a few
> simple decoding and filter operations, then two processors with our main
> business logic and finally a few simple transformations and reduce steps.
> While reduce and sink writer tasks run, we encounter much more resident
> memory usage of the flink TM java process than expected from configuration,
> i.e. higher than the configured process memory. And that leads to failures,
> either the system OOM killer intervening or the JVM not being able to mmap.
> I know that the writers do use native memory, e.g. for avro deflate
> compression, which is a native method. Also the IO likely uses some native
> memory. We now configure 5g of task off-heap memory to compensate for any
> such native memory usage, but still encounter higher memory usage. Even 5g
> seems way too much for some compression buffers and IO, let alone more than
> that. So currently my main theory is that I misunderstand something about
> the memory related config. E.g. that task slots factor into used/allocated
> memory.
>
>
>
> We during the late stages of the job, i.e. during reduce and sink
> operations, we observe much higher memory usage than expected. The increase
> in memory usage isn't happening slowly, gradually over time, but quickly
> when those tasks start. This is an example of ps output for one TM:
>
>
>
> PID %CPU %MEMVSZ   RSS TTY  STAT START   TIME COMMAND
>
> 14961  130 94.8 32213536 30957104 ?   Sl   Sep12 7453:03
> /usr/lib/jvm/java-11-amazon-corretto.aarch64/bin/java -Xmx7435661840
> -Xms7435661840 -XX:MaxDirectMemorySize=9663676416
> -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1724829444792_0007/container_1724829444792_0007_01_000330/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.network.min=2147483648b -D taskmanager.cpu.cores=4.0 -D
> taskmanager.memory.task.off-heap.size=5368709120b -D
> taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none
> -D taskmanager.memory.jvm-overhead.min=1073741824b -D
> taskmanager.memory.framework.off-heap.size=2147483648b -D
> taskmanager.memory.network.max=2147483648b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=7328288240b -D
> taskmanager.memory.task.heap.size=7301444112b -D
> taskmanager.numberOfTaskSlots=4 -D
> taskmanager.memory.jvm-overhead.max=1073741824b --configDir .
> -Djobmanager.rpc.address=ip-172-30-119-251.us-west-2.compute.internal
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b
> -Dweb.tmpdir=/tmp/flink-web-81fb345d-de64-4002-bd78-4454ca901566
> -Djobmanager.rpc.port=42195
> -Drest.address=ip-172-30-119-251.us-west-2.compute.internal
> -Djobmanager.memory.jvm-overhead.max=322122552b
> -Djobmanager.memory.jvm-overhead.min=322122552b
> -Dtaskmanager.resource-id=container_1724829444792_0007_01_000330
> -Dinternal.taskmanager.resource-id.metadata=ip-172-30-116-113.us-west-2.compute.internal:8041
> -Djobmanager.memory.jvm-metaspace.size=268435456b
> -Djobmanager.memory.heap.size=2496449736b
>
>
>
> For easier readability:
>
> RSS = 30957104kB = 29.5GB
>
>
>
> Then this is the relevant bit of our config file. It includes explanations
> of how we came up with those numbers, so here's where I hope someone can
> quickly tell me where I am wrong :)
>
>
>
> # We configure yarn to pr

Re: Troubleshooting checkpoint expiration

2024-08-31 Thread Alexis Sarda-Espinosa
Well, for future reference, this helped in the case of ABFS:


logger.abfs.name = org.apache.hadoop.fs.azurebfs.services.AbfsClient
logger.abfs.level = DEBUG
logger.abfs.filter.failures.type = RegexFilter
logger.abfs.filter.failures.regex = ^.*([Ff]ail|[Rr]etry|: [45][0-9]{2},).*$
logger.abfs.filter.failures.onMatch = ACCEPT
logger.abfs.filter.failures.onMismatch = DENY


Am Mi., 7. Aug. 2024 um 12:18 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> I must ask again if anyone at least knows if Flink's file system can
> expose more detailed exceptions when things go wrong, Azure support is
> asking for specific exception messages to decide how to troubleshoot.
>
> Regards,
> Alexis.
>
> Am Di., 23. Juli 2024 um 13:39 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hi again,
>>
>> I found a Hadoop class that can log latency information [1], but since I
>> don't see any exceptions in the logs when a checkpoint expires due to
>> timeout, I'm still wondering if I can change other log levels to get more
>> insights, maybe somewhere in Flink's file system abstractions?
>>
>> [1]
>> https://hadoop.apache.org/docs/r3.2.4/hadoop-azure/abfs.html#Perf_Options
>>
>>
>> Regards,
>> Alexis.
>>
>> Am Fr., 19. Juli 2024 um 09:17 Uhr schrieb Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com>:
>>
>>> Hello,
>>>
>>> We have a Flink job that uses ABFSS for checkpoints and related state.
>>> Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
>>> guessing that's an issue in the infrastructure or on Azure's side, but I
>>> was wondering if there are Flink/Hadoop Java packages that log potentially
>>> useful information if we DEBUG/TRACE them?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Troubleshooting checkpoint expiration

2024-08-07 Thread Alexis Sarda-Espinosa
I must ask again if anyone at least knows if Flink's file system can expose
more detailed exceptions when things go wrong, Azure support is asking for
specific exception messages to decide how to troubleshoot.

Regards,
Alexis.

Am Di., 23. Juli 2024 um 13:39 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi again,
>
> I found a Hadoop class that can log latency information [1], but since I
> don't see any exceptions in the logs when a checkpoint expires due to
> timeout, I'm still wondering if I can change other log levels to get more
> insights, maybe somewhere in Flink's file system abstractions?
>
> [1]
> https://hadoop.apache.org/docs/r3.2.4/hadoop-azure/abfs.html#Perf_Options
>
> Regards,
> Alexis.
>
> Am Fr., 19. Juli 2024 um 09:17 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hello,
>>
>> We have a Flink job that uses ABFSS for checkpoints and related state.
>> Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
>> guessing that's an issue in the infrastructure or on Azure's side, but I
>> was wondering if there are Flink/Hadoop Java packages that log potentially
>> useful information if we DEBUG/TRACE them?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Using state processor for a custom windowed aggregate function

2024-08-05 Thread Alexis Sarda-Espinosa
Hi again,

I finally figured out that the reason the keyGroupIndex appeared different
is because the field I'm adding to my POJO is an enum, and the hash code of
enums is not well-defined/stable in the JVM. I think this was the only
issue...

Regards,
Alexis.

Am Mo., 5. Aug. 2024 um 09:55 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Mornin’ Alexis,
>
>
>
> The thing that puzzled me (and I guess you as well) was the switch from
> the accumulator type to the projection type R in the savepoint window
> operator.
>
> I do these things quite often, but I always want to understand
>
>- what shape of data ends up how in windowed state
>- the shape before, the shape after
>- breaking into the trigger functions and the aggregation functions
>and the going up a couple of call stacks up to the window operator
>implementation really helps
>- In your case when you bootstrap from a previous window operator state
>   - The state will consist of previous key x namespace (window) x
>   accumulator types nested
>   - When building up a new migrated window state you’ll need to find
>   a way to
>  - First synthesize (map/flatmap) the new state data element (key
>  x window x accumulator) from the old state such that
>  - you can aggregate it into the new state
>      - (cardinalities could change)
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Friday, August 2, 2024 7:47 PM
> *To:* Schwalbe Matthias 
> *Cc:* user 
> *Subject:* Re: Using state processor for a custom windowed aggregate
> function
>
>
>
> Hi Matthias,
>
>
>
> Thank you for looking into this. That change makes the example work, but
> my real job still has issues. There is a key difference that might be
> causing the problem, but that's not so easy to replicate in the example I
> made.
>
>
>
> Essentially, I'm trying to modify the partition key of an operator, so in
> my real job my bootstrap stream comes from a SavepointReader getting data
> from an existing UID so that I can key it differently and write it with a
> different UID. So far so good, however, I'm also trying to modify my state
> POJO at the same time - the equivalent in my example would be to add a
> field to the GenericService POJO. I'm guessing this is causing some
> inconsistency.
>
>
>
> The reason I think this is that, assuming I debugged the right classes, I
> can see that the keyGroupIndex that is used when I write the savepoint is
> not the same as when I read from it afterward to validate; I can actually
> see the state I bootstrapped in one of the state backend's tables, but
> since the indices don't correspond, I get an iterator with a null entry.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Fr., 2. Aug. 2024 um 13:55 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Alexis,
>
>
>
>
>
> I've worked it out:
>
>
>
> The input of your com.test.Application.StateReader#readWindow(...,
> Iterable elements, ...) is of the
>
> projection type com.test.Application.AggregateFunctionForMigration:
> AggregateFunction<..., OUT = GenericService>.
>
> I.e. you need to implement
> com.test.Application.AggregateFunctionForMigration#getResult e.g. as
>
>
>
> @Override
>
> public GenericService getResult(GenericService accumulator) {
>
> return accumulator;
>
> }
>
>
>
> If you take a closer look at your call to
> org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see
> that this is indeed the case:
>
>
>
> /**
>
>  * Reads window state generated using an {@link AggregateFunction}.
>
>  *
>
>  * @param uid The uid of the operator.
>
>  * @param aggregateFunction The aggregate function used to create the
> window.
>
>  * @param readerFunction The window reader function.
>
>  * @param keyType The key type of the window.
>
>  * @param accType The type information of the accumulator function.
>
>  * @param outputType The output type of the reader function.
>
>  * @param  The type of the key.
>
>  * @param  The type of the values that are aggregated.
>
>  * @param  The type of the accumulator (intermediate aggregate
> state).
>
>  * @param  The type of the aggregated result.
>
>  * @param  The output type of the reader function.
>
>  * @return A {@code DataStream} of objects read from keyed state.
>
>  * @throws IOException If savepoint does not contain t

Re: Using state processor for a custom windowed aggregate function

2024-08-02 Thread Alexis Sarda-Espinosa
Hi Matthias,

Thank you for looking into this. That change makes the example work, but my
real job still has issues. There is a key difference that might be causing
the problem, but that's not so easy to replicate in the example I made.

Essentially, I'm trying to modify the partition key of an operator, so in
my real job my bootstrap stream comes from a SavepointReader getting data
from an existing UID so that I can key it differently and write it with a
different UID. So far so good, however, I'm also trying to modify my state
POJO at the same time - the equivalent in my example would be to add a
field to the GenericService POJO. I'm guessing this is causing some
inconsistency.

The reason I think this is that, assuming I debugged the right classes, I
can see that the keyGroupIndex that is used when I write the savepoint is
not the same as when I read from it afterward to validate; I can actually
see the state I bootstrapped in one of the state backend's tables, but
since the indices don't correspond, I get an iterator with a null entry.

Regards,
Alexis.

Am Fr., 2. Aug. 2024 um 13:55 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
>
>
> I've worked it out:
>
>
>
> The input of your com.test.Application.StateReader#readWindow(...,
> Iterable elements, ...) is of the
>
> projection type com.test.Application.AggregateFunctionForMigration:
> AggregateFunction<..., OUT = GenericService>.
>
> I.e. you need to implement
> com.test.Application.AggregateFunctionForMigration#getResult e.g. as
>
>
>
> @Override
>
> public GenericService getResult(GenericService accumulator) {
>
> return accumulator;
>
> }
>
>
>
> If you take a closer look at your call to
> org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see
> that this is indeed the case:
>
>
>
> /**
>
>  * Reads window state generated using an {@link AggregateFunction}.
>
>  *
>
>  * @param uid The uid of the operator.
>
>  * @param aggregateFunction The aggregate function used to create the
> window.
>
>  * @param readerFunction The window reader function.
>
>  * @param keyType The key type of the window.
>
>  * @param accType The type information of the accumulator function.
>
>  * @param outputType The output type of the reader function.
>
>  * @param  The type of the key.
>
>  * @param  The type of the values that are aggregated.
>
>  * @param  The type of the accumulator (intermediate aggregate
> state).
>
>  * @param  The type of the aggregated result.
>
>  * @param  The output type of the reader function.
>
>  * @return A {@code DataStream} of objects read from keyed state.
>
>  * @throws IOException If savepoint does not contain the specified uid.
>
>  */
>
> public  DataStream aggregate(
>
> String uid,
>
> AggregateFunction aggregateFunction,
>
> WindowReaderFunction readerFunction,
>
> TypeInformation keyType,
>
> TypeInformation accType,
>
> TypeInformation outputType)
>
> throws IOException {
>
>
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
> PS: will you come to the FlinkForward conference in October in Berlin (to
> socialize)?
>
>
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Wednesday, July 31, 2024 3:46 PM
> *To:* Schwalbe Matthias 
> *Cc:* user 
> *Subject:* Re: Using state processor for a custom windowed aggregate
> function
>
>
>
> Hi Matthias,
>
>
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
>
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Alexis,
>
>
>
> Just a couple of points to double-check:
>
>- Does your code compile? (the second argument of withOperator(..)
>should derive StateBoot

Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi again,

I realized it's easy to create a reproducible example, see this specific
commit:

https://github.com/asardaes/test-flink-state-processor/commit/95e65f88fd1e38bcba63ebca68e3128789c0d2f2

When I run that application, I see the following output:

Savepoint created
KEY=GenericServiceCompositeKey(serviceId=X, countryCode=BAR)
Why is this null?

So a key is missing, and the key that was written has a null state.

Regards,
Alexis.

Am Mi., 31. Juli 2024 um 15:45 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Matthias,
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
> Regards,
> Alexis.
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
>> Hi Alexis,
>>
>>
>>
>> Just a couple of points to double-check:
>>
>>- Does your code compile? (the second argument of withOperator(..)
>>should derive StateBootstrapTransformation instead of
>>SingleOutputStreamOperator)
>>- From the documentation of savepoint API you’ll find examples for
>>each type of state
>>- Your preparation needs to generate events that within your
>>StateBootstrapTransformation impementation get set into state primitives
>>much the same as you would do with a normal streaming operator
>>- Please note that a savepoint api job always runs in batch-mode,
>>hence
>>   - Keyed events are processed in key order first and the in time
>>   order
>>   - Triggers will only be fired after processing all events of a
>>   respective key are processed
>>   - Semantics are therefore slightly different as for streaming
>>   timers
>>
>>
>>
>> Hope that helps 😊
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Alexis Sarda-Espinosa 
>> *Sent:* Monday, July 29, 2024 9:18 PM
>> *To:* user 
>> *Subject:* Using state processor for a custom windowed aggregate function
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am trying to create state for an aggregate function that is used with a
>> GlobalWindow. This basically looks like:
>>
>>
>>
>> savepointWriter.withOperator(
>> OperatorIdentifier.forUid(UID),
>> OperatorTransformation.bootstrapWith(stateToMigrate)
>> .keyBy(...)
>> .window(GlobalWindows.create())
>> .aggregate(new AggregateFunctionForMigration())
>> )
>>
>> The job runs correctly and writes a savepoint, but if I then read the
>> savepoint I just created and load the state for that UID, the "elements"
>> iterable in the WindowReaderFunction's readWindow() method has a non-zero
>> size, but every element is null.
>>
>>
>>
>> I've tried specifying a custom trigger between window() and aggregate(),
>> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>>
>>
>>
>> Am I missing something?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>


Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi Matthias,

This indeed compiles, I am able to actually generate a savepoint, it's just
that all the window states in that savepoint appear to be null. The second
argument of withOperator(...) is specified via
OperatorTransformation...aggregate(), so the final transformation is built
by WindowedStateTransformation#aggregate().

I don't have any special logic with timers or even multiple events per key,
in fact, my "stateToMigrate" already contains a single state instance for
each key of interest, so my AggregateFunctionForMigration simply returns
"value" in its add() method, no other logic there.

Regards,
Alexis.

Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Just a couple of points to double-check:
>
>- Does your code compile? (the second argument of withOperator(..)
>should derive StateBootstrapTransformation instead of
>SingleOutputStreamOperator)
>- From the documentation of savepoint API you’ll find examples for
>each type of state
>- Your preparation needs to generate events that within your
>StateBootstrapTransformation impementation get set into state primitives
>much the same as you would do with a normal streaming operator
>- Please note that a savepoint api job always runs in batch-mode, hence
>   - Keyed events are processed in key order first and the in time
>   order
>   - Triggers will only be fired after processing all events of a
>   respective key are processed
>   - Semantics are therefore slightly different as for streaming timers
>
>
>
> Hope that helps 😊
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, July 29, 2024 9:18 PM
> *To:* user 
> *Subject:* Using state processor for a custom windowed aggregate function
>
>
>
> Hello,
>
>
>
> I am trying to create state for an aggregate function that is used with a
> GlobalWindow. This basically looks like:
>
>
>
> savepointWriter.withOperator(
> OperatorIdentifier.forUid(UID),
> OperatorTransformation.bootstrapWith(stateToMigrate)
> .keyBy(...)
> .window(GlobalWindows.create())
> .aggregate(new AggregateFunctionForMigration())
> )
>
> The job runs correctly and writes a savepoint, but if I then read the
> savepoint I just created and load the state for that UID, the "elements"
> iterable in the WindowReaderFunction's readWindow() method has a non-zero
> size, but every element is null.
>
>
>
> I've tried specifying a custom trigger between window() and aggregate(),
> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>
>
>
> Am I missing something?
>
>
>
> Regards,
>
> Alexis.
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Using state processor for a custom windowed aggregate function

2024-07-29 Thread Alexis Sarda-Espinosa
Hello,

I am trying to create state for an aggregate function that is used with a
GlobalWindow. This basically looks like:

savepointWriter.withOperator(
OperatorIdentifier.forUid(UID),
OperatorTransformation.bootstrapWith(stateToMigrate)
.keyBy(...)
.window(GlobalWindows.create())
.aggregate(new AggregateFunctionForMigration())
)

The job runs correctly and writes a savepoint, but if I then read the
savepoint I just created and load the state for that UID, the "elements"
iterable in the WindowReaderFunction's readWindow() method has a non-zero
size, but every element is null.

I've tried specifying a custom trigger between window() and aggregate(),
always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.

Am I missing something?

Regards,
Alexis.


Re: Troubleshooting checkpoint expiration

2024-07-23 Thread Alexis Sarda-Espinosa
Hi again,

I found a Hadoop class that can log latency information [1], but since I
don't see any exceptions in the logs when a checkpoint expires due to
timeout, I'm still wondering if I can change other log levels to get more
insights, maybe somewhere in Flink's file system abstractions?

[1]
https://hadoop.apache.org/docs/r3.2.4/hadoop-azure/abfs.html#Perf_Options

Regards,
Alexis.

Am Fr., 19. Juli 2024 um 09:17 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> We have a Flink job that uses ABFSS for checkpoints and related state.
> Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
> guessing that's an issue in the infrastructure or on Azure's side, but I
> was wondering if there are Flink/Hadoop Java packages that log potentially
> useful information if we DEBUG/TRACE them?
>
> Regards,
> Alexis.
>
>


Troubleshooting checkpoint expiration

2024-07-19 Thread Alexis Sarda-Espinosa
Hello,

We have a Flink job that uses ABFSS for checkpoints and related state.
Lately we see a lot of exceptions due to expiration of checkpoints, and I'm
guessing that's an issue in the infrastructure or on Azure's side, but I
was wondering if there are Flink/Hadoop Java packages that log potentially
useful information if we DEBUG/TRACE them?

Regards,
Alexis.


Re: Parallelism of state processor jobs

2024-07-06 Thread Alexis Sarda-Espinosa
Hi Junrui,

I think you understood correctly. What I'm seeing is that each vertex has a
single subtask, but multiple vertices are started in parallel in different
slots. That is not a problem in my case, I _want_ to parallelize the work,
it's just that this mechanism is very different from streaming jobs, where
the total number of slots in the cluster must equal the maximum vertex
parallelism---in other words, a streaming job won't use free slots
regardless of how many vertices there are, whereas a batch job needs one
slot per subtask in order to parallelize. I was not aware that batch jobs
interact with task manager slots like this.

The other thing I mentioned is what really remains as a problem: even
though the different vertices do start in parallel, some of them finish in
seconds and others in more than a minute, even though all of them do the
exact same transformation, just with different operator IDs. I'm using
Flink 1.18.1 btw.

Regards,
Alexis.

Am Sa., 6. Juli 2024 um 12:09 Uhr schrieb Junrui Lee :

> Hi Alexis,
>
> Could you clarify what you mean by "If I add more slots to the task
> manager, I see the transformations actually start in parallel even though I
> submit the job with 'flink run -p 1'"?
> Are you asking if multiple slots are working simultaneously, or if a
> single JobVertex contains multiple subtasks?
>
> In fact, the number of slots and parallelism are not the same concept.
> And Flink Batch jobs can run even with only a single slot, and when more
> slots become available, Flink will schedule and deploy more parallelizable
> tasks (unless their upstream tasks have not finished). If you want only one
> slot to be active at a time, you can limit the resources of the cluster —
> for instance, by setting "slotmanager.number-of-slots.max" to 1.
>
> If you intend for each JobVertex to have a parallelism of 1, and you find
> that this isn't being enforced when using the "flink run -p 1" command. In
> that case, it would be helpful to have more detailed information to assist
> with troubleshooting, including the version of Flink in use and the
> JobManager logs.
>
> Alexis Sarda-Espinosa  于2024年7月6日周六 15:35写道:
>
>> Hi Junrui,
>>
>> Thanks for the confirmation. I tested some more and I'm seeing a strange
>> behavior.
>>
>> I'm currently testing a single source stream that is fed to 6 identical
>> transformations. The state processor api requires batch mode and, from what
>> I can tell, I must specify a parallelism of 1 in the job, otherwise it
>> freezes. However, if I add more slots to the task manager, I see the
>> transformations actually start in parallel even though I submit the job
>> with "flink run -p 1". Is this expected of batch mode?
>>
>> Additionally, regardless of how much memory I give to the task manager,
>> some transformations finish in around 6 seconds, and then the others need
>> more than 1 minute even though it's the same transformation, and each one
>> writes around 70MB in my local disk. The flame graph shows the slow
>> operators are just parked due to an ArrayBlockingQueue whose size is hard
>> coded as 16 in the Flink sources. Am I missing something crucial for tuning
>> such jobs?
>>
>> Regards,
>> Alexis.
>>
>> On Sat, 6 Jul 2024, 03:29 Junrui Lee,  wrote:
>>
>>> Hi Alexis,
>>>
>>> For the SavepointWriter, I've briefly looked over the code and the write
>>> operation is enforced as non-parallel.
>>>
>>> Best,
>>> Junrui
>>>
>>> Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:
>>>
>>>> Hi Gabor,
>>>>
>>>> Thanks for the quick response. What about SavepointWriter? In my case
>>>> I'm actually writing a job that will read from an existing savepoint and
>>>> modify some of its data to write a new one.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com>:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> It depends. When one uses SavepointLoader to read metadata only then
>>>>> it's non-parallel.
>>>>> SavepointReader however is basically a normal batch job with all its
>>>>> features.
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>>>>> sarda.espin...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Really quick question, when using the state processor API, are all
>>>>>> transformations performed in a non-parallel fashion?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>


Re: Parallelism of state processor jobs

2024-07-06 Thread Alexis Sarda-Espinosa
Hi Junrui,

Thanks for the confirmation. I tested some more and I'm seeing a strange
behavior.

I'm currently testing a single source stream that is fed to 6 identical
transformations. The state processor api requires batch mode and, from what
I can tell, I must specify a parallelism of 1 in the job, otherwise it
freezes. However, if I add more slots to the task manager, I see the
transformations actually start in parallel even though I submit the job
with "flink run -p 1". Is this expected of batch mode?

Additionally, regardless of how much memory I give to the task manager,
some transformations finish in around 6 seconds, and then the others need
more than 1 minute even though it's the same transformation, and each one
writes around 70MB in my local disk. The flame graph shows the slow
operators are just parked due to an ArrayBlockingQueue whose size is hard
coded as 16 in the Flink sources. Am I missing something crucial for tuning
such jobs?

Regards,
Alexis.

On Sat, 6 Jul 2024, 03:29 Junrui Lee,  wrote:

> Hi Alexis,
>
> For the SavepointWriter, I've briefly looked over the code and the write
> operation is enforced as non-parallel.
>
> Best,
> Junrui
>
> Alexis Sarda-Espinosa  于2024年7月6日周六 01:27写道:
>
>> Hi Gabor,
>>
>> Thanks for the quick response. What about SavepointWriter? In my case I'm
>> actually writing a job that will read from an existing savepoint and modify
>> some of its data to write a new one.
>>
>> Regards,
>> Alexis.
>>
>> Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
>> gabor.g.somo...@gmail.com>:
>>
>>> Hi Alexis,
>>>
>>> It depends. When one uses SavepointLoader to read metadata only then
>>> it's non-parallel.
>>> SavepointReader however is basically a normal batch job with all its
>>> features.
>>>
>>> G
>>>
>>>
>>> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Really quick question, when using the state processor API, are all
>>>> transformations performed in a non-parallel fashion?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Parallelism of state processor jobs

2024-07-05 Thread Alexis Sarda-Espinosa
Hi Gabor,

Thanks for the quick response. What about SavepointWriter? In my case I'm
actually writing a job that will read from an existing savepoint and modify
some of its data to write a new one.

Regards,
Alexis.

Am Fr., 5. Juli 2024 um 17:37 Uhr schrieb Gabor Somogyi <
gabor.g.somo...@gmail.com>:

> Hi Alexis,
>
> It depends. When one uses SavepointLoader to read metadata only then it's
> non-parallel.
> SavepointReader however is basically a normal batch job with all its
> features.
>
> G
>
>
> On Fri, Jul 5, 2024 at 5:21 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Really quick question, when using the state processor API, are all
>> transformations performed in a non-parallel fashion?
>>
>> Regards,
>> Alexis.
>>
>>


Parallelism of state processor jobs

2024-07-05 Thread Alexis Sarda-Espinosa
Hello,

Really quick question, when using the state processor API, are all
transformations performed in a non-parallel fashion?

Regards,
Alexis.


Re: Reminder: Help required to fix security vulnerabilities in Flink Docker image

2024-06-21 Thread Alexis Sarda-Espinosa
Hi Elakiya,

just to be clear, I'm not a Flink maintainer, but here my 2 cents.

I imagine the issues related to Go come from 'gosu', which is installed in
the official Flink Docker images. You can see [1] for some thoughts from
the gosu maintainer regarding CVEs (and the md file he links).

Nevertheless, there have been newer gosu releases and Flink hasn't updated
it in a long time, and I think it could be worth doing that, it seems to me
that it's just about changing one env var, e.g. [2].

[1] https://github.com/tianon/gosu/issues/136#issuecomment-2150375314
[2]
https://github.com/apache/flink-docker/blob/master/1.19/scala_2.12-java11-ubuntu/Dockerfile#L28

Regards,
Alexis.

Am Fr., 21. Juni 2024 um 15:37 Uhr schrieb elakiya udhayanan <
laks@gmail.com>:

> Hi Team,
>
> I would like to remind about the request for the help required to fix the
> vulnerabilities seen in the Flink Docker image. Any help is appreciated.
>
> Thanks in advance.
>
> Thanks,
> Elakiya U
>
> On Tue, Jun 18, 2024 at 12:51 PM elakiya udhayanan 
> wrote:
>
>> Hi Community,
>>
>> In one of our applications we are using a Fink Docker image and running
>> Flink as a Kubernetes pod. As per policy, we tried scanning the Docker
>> image for security vulnerabilities using JFrog XRay and we find that there
>> are multiple critical vulnerabilities being reported as seen in the below
>> table. This is the same case for the latest Flink version 1.19.0 as well
>>
>> | Severity  | Direct Package   | Impacted Package  |
>> Impacted Package Version | Fixed Versions | Type  | CVE
>>|
>>
>> |---|--|---|---||---||
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.8, 1.20.3]   | Go|
>> CVE-2023-24538 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.9, 1.20.4]   | Go|
>> CVE-2023-24540 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29404 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29405 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.19.10, 1.20.5]  | Go|
>> CVE-2023-29402 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.9, 1.17.2]   | Go|
>> CVE-2021-38297 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.16.14, 1.17.7]  | Go|
>> CVE-2022-23806 |
>> | Critical  | sha256__0690274ef266a9a2f... | certifi   |
>> 2020.6.20 | [2023.7.22]| Python|
>> CVE-2023-37920 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.12.6, 1.13beta1]| Go|
>> CVE-2019-11888 |
>> | Critical  | sha256__c6571bb0f39f334ef... | github.com/golang/go  |
>> 1.11.1| [1.11.13, 1.12.8]  | Go|
>> CVE-2019-14809 |
>>
>> These vulnerabilities are related to the github.com/golang/go and
>> certifi packages.
>>
>> Please help me addressing the below questions:
>> Is there any known workaround for these vulnerabilities while using the
>> affected Flink versions?
>> Is there an ETA for a fix for these vulnerabilities in upcoming Flink
>> releases?
>> Are there any specific steps recommended to mitigate these issues in the
>> meantime?
>> Any guidance or recommendations would be greatly appreciated.
>>
>> Thanks in advance
>>
>> Thanks,
>> Elakiya U
>>
>


Re: Flink Kubernetes Operator 1.8.0 CRDs

2024-05-28 Thread Alexis Sarda-Espinosa
Hello,

I've also noticed this in our Argo CD setup. Since priority=0 is the
default, Kubernetes accepts it but doesn't store it in the actual resource,
I'm guessing it's like a mutating admission hook that comes out of the box.
The "priority" property can be safely removed from the CRDs.

Regards,
Alexis.

Am Do., 9. Mai 2024 um 22:03 Uhr schrieb Prasad, Neil <
neil.pra...@activision.com>:

> Sorry, let me explain. I currently have the operator deployed and managed
> via ArgoCD. The CRDs I separated out into a different chart so I can do
> upgrades on them. I am working on upgrading from version 1.7.0 to 1.8.0
> using ArgoCD. What I’ve done is replace the CRDs in the separate chart and
> made sure ArgoCD runs a replace action against them. I then bumped the
> chart version to 1.8.0 like so:
>
> dependencies:
>
> - name: flink-kubernetes-operator
>
> version: 1.8.0
>
> repository:
> https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
>
>
>
> With those changes in place, ArgoCD still shows a diff specifically around
> the additionalPrinterColumns section. The newer CRD versions have a line of
> “priority: 0” under both Job Status and Lifecycle State but this is never
> reflected in the cluster, either by replacing via ArgoCD or by hand. This
> is the issue that I’m trying to resolve, how do I make sure this matches in
> cluster when it doesn’t want to apply? Upgrading from 1.6.1 to 1.7.0 has
> the same issue where that line (priority: 0) isn’t reflected.
>
>
>
> Now this doesn’t hurt the functionality we need but if we want to make
> sure the CRDs are the same that are bundled in a release, then it’ll always
> show a diff.
>
>
>
> *From: *Márton Balassi 
> *Date: *Thursday, May 9, 2024 at 3:50 PM
> *To: *Prasad, Neil 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Flink Kubernetes Operator 1.8.0 CRDs
>
> Hi, What do you mean exactly by cannot be applied or replaced? What
> exactly is the issue? Are you installing fresh or trying to upgrade from a
> previous version? If the latter please follow this: https: //nightlies.
> apache. org/flink/flink-kubernetes-operator-docs-release-1. 
> 8/docs/operations/upgrade/#2-upgrading-the-crdOn
>
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an Untrusted Sender *
>
> You have not previously corresponded with this sender.
>
>
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi,
>
>
>
> What do you mean exactly by cannot be applied or replaced? What exactly is
> the issue?
>
>
>
> Are you installing fresh or trying to upgrade from a previous version? If
> the latter please follow this:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/upgrade/#2-upgrading-the-crd
> 
>
>
>
> On Thu, May 9, 2024 at 9:18 PM Prasad, Neil 
> wrote:
>
> I am writing to report an issue with the Flink Kubernetes Operator version
> 1.8.0. The CRD is unable to be applied or replaced in minikube or GKE.
> However, the CRD works on version 1.7.0 of the operator. I thought it would
> be helpful to bring this issue to the attention of the community and get
> some help in case someone has run into this issue before .
>
>
>
> Thank you for your attention to this matter.
>
>


Re: Need help in understanding PojoSerializer

2024-03-20 Thread Alexis Sarda-Espinosa
Hi Sachin,

Check the last few comments I wrote in this thread:

https://lists.apache.org/thread/l71d1cqo9xv8rsw0gfjo19kb1pct2xj1

Regards,
Alexis.

On Wed, 20 Mar 2024, 18:51 Sachin Mittal,  wrote:

> Hi,
> I saw the post but I did not understand how I would configure these fields
> to use those serializers. (I can change the set type to a list type for
> now).
> As per the docs I see that we can annotate fields with @TypeInfo
>
> But what I did not get is how using this annotation I can use ListSerializer
> and *MapSerializer.*
>
> Thanks
> Sachin
>
>
> On Wed, Mar 20, 2024 at 10:47 PM Ken Krugler 
> wrote:
>
>> Flink doesn’t have built-in support for serializing Sets.
>>
>> See this (stale) issue about the same:
>> https://issues.apache.org/jira/browse/FLINK-16729
>>
>> You could create a custom serializer for sets, see
>> https://stackoverflow.com/questions/59800851/flink-serialization-of-java-util-list-and-java-util-map
>> and
>> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/ListSerializer.html
>> for details on how this was done for a list, but it’s not trivial.
>>
>> Or as a hack, use a Map and the existing support for map
>> serialization via
>> https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/typeutils/base/MapSerializer.html
>>
>> — Ken
>>
>>
>> On Mar 20, 2024, at 10:04 AM, Sachin Mittal  wrote:
>>
>> Hi,
>> I have a Pojo class like this
>>
>> public class A {
>>
>> public String str;
>>
>> public Set aSet;
>>
>> public Map dMap;
>>
>> }
>>
>> However when I start the flink program I get this message:
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
>> dMap will be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor[] - Field A#
>> aSet will be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>>
>> Also in my code I have added
>>
>> env.getConfig().disableGenericTypes();
>>
>> So I don't understand when I use Maps and Sets of primitive types why is 
>> Flink not
>>
>> able to use PojoSerializer for these fields and even when I have disabled 
>> generics types.
>>
>> why I am getting message that it will be processed as GenericType?
>>
>>
>> Any help in understanding what I need to do to ensure all the fields of my 
>> object are handled using PojoSerializer.
>>
>>
>> Thanks
>>
>> Sachin
>>
>>
>>
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink & Pinot
>>
>>
>>
>>


Re: Impact of RocksDB backend on the Java heap

2024-02-19 Thread Alexis Sarda-Espinosa
Hi Zakelly,

Yeah that makes sense to me, I was just curious about whether reading could
be a bottleneck or not, but I imagine user-specific logic would be better
than a generic cache from Flink that might habe a low hit rate.

Thanks again,
Alexis.

On Mon, 19 Feb 2024, 07:29 Zakelly Lan,  wrote:

> Hi Alexis,
>
> Assuming the bulk load for a batch of sequential keys performs better than
> accessing them one by one, the main problem comes to do we really need to
> access all the keys that were bulk-loaded to cache before. In other words,
> cache hit rate is the key issue. If the rate is high, even though a single
> key-value is large and loading them is slow, it is still worth it to load
> them in advance. In case of timer and iteration (which I missed in last
> mail), the cache is almost guaranteed to hit. Thus a cache is introduced to
> enhance the performance here.
>
>
> Best,
> Zakelly
>
> On Sun, Feb 18, 2024 at 7:42 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Zakelly,
>>
>> thanks for the information, that's interesting. Would you say that
>> reading a subset from RocksDB is fast enough to be pretty much negligible,
>> or could it be a bottleneck if the state of each key is "large"? Again
>> assuming the number of distinct partition keys is large.
>>
>> Regards,
>> Alexis.
>>
>> On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:
>>
>>> Hi Alexis,
>>>
>>> Flink does need some heap memory to bridge requests to rocksdb and
>>> gather the results. In most cases, the memory is discarded immediately
>>> (eventually collected by GC). In case of timers, flink do cache a limited
>>> subset of key-values in heap to improve performance.
>>>
>>> In general you don't need to consider its heap consumption since it is
>>> minor.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
>>> wrote:
>>>
>>>> Hello Alexis,
>>>>
>>>> I don't think data in RocksDB resides in JVM even with function calls.
>>>>
>>>> For more details, check the link below:
>>>>
>>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>>>>
>>>> RocksDB has three main components - memtable, sstfile and WAL(not used
>>>> in Flink as Flink uses checkpointing). When TM starts with statebackend as
>>>> RocksDB,TM has its own RocksDB instance and the state is managed as column
>>>> Family by that TM. Any changes of state go into memtable --> sst-->
>>>> persistent store. When read, data goes to the buffers and cache of RocksDB.
>>>>
>>>> In the case of RocksDB as state backend, JVM still holds threads stack
>>>> as for high degree of parallelism, there are many
>>>> stacks maintaining separate thread information.
>>>>
>>>> Hope this helps!!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hi Asimansu
>>>>>
>>>>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>>>>> subsets must be bridged to the JVM somehow so that the data can be exposed
>>>>> to the functions running inside Flink, no?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>
>>>>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>>>>> wrote:
>>>>>
>>>>>> Hello Alexis,
>>>>>>
>>>>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>>>>> ends up on the off-heap in the memory.
>>>>>>
>>>>>> For more details, check the following link:
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>>>>
>>>>>> I hope this addresses your inquiry.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>>>>> sarda.espin...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Most info regarding RocksDB memory for Flink focuses on what's
>>>>>>> needed independently of the JVM (although the Flink process configures 
>>>>>>> its
>>>>>>> limits and so on). I'm wondering if there are additional special
>>>>>>> considerations with regards to the JVM heap in the following scenario.
>>>>>>>
>>>>>>> Assuming a key used to partition a Flink stream and its state has a
>>>>>>> high cardinality, but that the state of each key is small, when Flink
>>>>>>> prepares the state to expose to a user function during a call (with a 
>>>>>>> given
>>>>>>> partition key), I guess it loads only the required subset from RocksDB, 
>>>>>>> but
>>>>>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>>>>>> does, does it stay "cached" in the JVM for some time or is it 
>>>>>>> immediately
>>>>>>> discarded after the user function completes?
>>>>>>>
>>>>>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>


Re: Impact of RocksDB backend on the Java heap

2024-02-18 Thread Alexis Sarda-Espinosa
Hi Zakelly,

thanks for the information, that's interesting. Would you say that reading
a subset from RocksDB is fast enough to be pretty much negligible, or could
it be a bottleneck if the state of each key is "large"? Again assuming the
number of distinct partition keys is large.

Regards,
Alexis.

On Sun, 18 Feb 2024, 05:02 Zakelly Lan,  wrote:

> Hi Alexis,
>
> Flink does need some heap memory to bridge requests to rocksdb and gather
> the results. In most cases, the memory is discarded immediately (eventually
> collected by GC). In case of timers, flink do cache a limited subset of
> key-values in heap to improve performance.
>
> In general you don't need to consider its heap consumption since it is
> minor.
>
>
> Best,
> Zakelly
>
> On Fri, Feb 16, 2024 at 4:43 AM Asimansu Bera 
> wrote:
>
>> Hello Alexis,
>>
>> I don't think data in RocksDB resides in JVM even with function calls.
>>
>> For more details, check the link below:
>>
>> https://github.com/facebook/rocksdb/wiki/RocksDB-Overview#3-high-level-architecture
>>
>> RocksDB has three main components - memtable, sstfile and WAL(not used in
>> Flink as Flink uses checkpointing). When TM starts with statebackend as
>> RocksDB,TM has its own RocksDB instance and the state is managed as column
>> Family by that TM. Any changes of state go into memtable --> sst-->
>> persistent store. When read, data goes to the buffers and cache of RocksDB.
>>
>> In the case of RocksDB as state backend, JVM still holds threads stack as
>> for high degree of parallelism, there are many stacks maintaining separate
>> thread information.
>>
>> Hope this helps!!
>>
>>
>>
>>
>>
>> On Thu, Feb 15, 2024 at 11:21 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi Asimansu
>>>
>>> The memory RocksDB manages is outside the JVM, yes, but the mentioned
>>> subsets must be bridged to the JVM somehow so that the data can be exposed
>>> to the functions running inside Flink, no?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, 15 Feb 2024, 14:06 Asimansu Bera, 
>>> wrote:
>>>
>>>> Hello Alexis,
>>>>
>>>> RocksDB resides off-heap and outside of JVM. The small subset of data
>>>> ends up on the off-heap in the memory.
>>>>
>>>> For more details, check the following link:
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>>>>
>>>> I hope this addresses your inquiry.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Most info regarding RocksDB memory for Flink focuses on what's needed
>>>>> independently of the JVM (although the Flink process configures its limits
>>>>> and so on). I'm wondering if there are additional special considerations
>>>>> with regards to the JVM heap in the following scenario.
>>>>>
>>>>> Assuming a key used to partition a Flink stream and its state has a
>>>>> high cardinality, but that the state of each key is small, when Flink
>>>>> prepares the state to expose to a user function during a call (with a 
>>>>> given
>>>>> partition key), I guess it loads only the required subset from RocksDB, 
>>>>> but
>>>>> does this small subset end (temporarily) up on the JVM heap? And if it
>>>>> does, does it stay "cached" in the JVM for some time or is it immediately
>>>>> discarded after the user function completes?
>>>>>
>>>>> Maybe this isn't even under Flink's control, but I'm curious.
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>


Re: Impact of RocksDB backend on the Java heap

2024-02-15 Thread Alexis Sarda-Espinosa
Hi Asimansu

The memory RocksDB manages is outside the JVM, yes, but the mentioned
subsets must be bridged to the JVM somehow so that the data can be exposed
to the functions running inside Flink, no?

Regards,
Alexis.


On Thu, 15 Feb 2024, 14:06 Asimansu Bera,  wrote:

> Hello Alexis,
>
> RocksDB resides off-heap and outside of JVM. The small subset of data ends
> up on the off-heap in the memory.
>
> For more details, check the following link:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/memory/mem_setup_tm/#managed-memory
>
> I hope this addresses your inquiry.
>
>
>
>
> On Thu, Feb 15, 2024 at 12:52 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Most info regarding RocksDB memory for Flink focuses on what's needed
>> independently of the JVM (although the Flink process configures its limits
>> and so on). I'm wondering if there are additional special considerations
>> with regards to the JVM heap in the following scenario.
>>
>> Assuming a key used to partition a Flink stream and its state has a high
>> cardinality, but that the state of each key is small, when Flink prepares
>> the state to expose to a user function during a call (with a given
>> partition key), I guess it loads only the required subset from RocksDB, but
>> does this small subset end (temporarily) up on the JVM heap? And if it
>> does, does it stay "cached" in the JVM for some time or is it immediately
>> discarded after the user function completes?
>>
>> Maybe this isn't even under Flink's control, but I'm curious.
>>
>> Regards,
>> Alexis.
>>
>


Impact of RocksDB backend on the Java heap

2024-02-15 Thread Alexis Sarda-Espinosa
Hello,

Most info regarding RocksDB memory for Flink focuses on what's needed
independently of the JVM (although the Flink process configures its limits
and so on). I'm wondering if there are additional special considerations
with regards to the JVM heap in the following scenario.

Assuming a key used to partition a Flink stream and its state has a high
cardinality, but that the state of each key is small, when Flink prepares
the state to expose to a user function during a call (with a given
partition key), I guess it loads only the required subset from RocksDB, but
does this small subset end (temporarily) up on the JVM heap? And if it
does, does it stay "cached" in the JVM for some time or is it immediately
discarded after the user function completes?

Maybe this isn't even under Flink's control, but I'm curious.

Regards,
Alexis.


Watermark alignment without idleness

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

I was reading through the comments in [1] and it seems that enabling
watermark alignment implicitly activates some idleness logic "if the source
waits for alignment for a long time" (even if withIdleness is not called
explicitly during the creation of WatermarkStrategy). Is this time somehow
configurable? I believe this isn't documented.

[1] https://issues.apache.org/jira/browse/FLINK-32496

Regards,
Alexis.


Re: Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

check this thread from some months ago, but keep in mind that it's not
really officially supported by Flink itself:

https://lists.apache.org/thread/l0pgm9o2vdywffzdmbh9kh7xorhfvj40

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 12:23 Uhr schrieb Fidea Lidea <
lideafidea...@gmail.com>:

> Hi Team,
>
> I request you to provide sample codes on data streaming using flink, kafka
> and spring boot.
>
> Awaiting your response.
>
> Thanks & Regards
> Nida Shaikh
>


Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think I understand the implications of idleness. In my case I really do
need it since even in the production environment one of the Kafka topics
will receive messages only sporadically.

With regards to the code, I have very limited understanding of Flink
internals, but that part I linked seems to indicate that, if a stream is
idle, the log should indicate a hard-coded maxAllowedWatermark equal to
Long.MAX_VALUE, that's why I thought the source isn't really considered as
idle.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 11:46 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Yes, I guess so, while not utterly acquainted with that part of the code.
>
> Apparently the SourceCoordinator cannot come up with a proper watermark
> time, if watermarking is turned off (idle mode of stream), and then it
> deducts watermark time from the remaining non-idle sources.
>
> It’s consistent with how idling-state of data streams is designed.
>
> However it still remains the notion of that one needs to compensate for
> .withIdleness(…) if correctness is any consideration.
>
> Using .withIdleness(…) is IMHO only justified in rare cases where
> implications are fully understood.
>
>
>
> If a source is not configured with .withIdleness(…) and becomes factually
> idle, all window aggregations or stateful stream joins stall until that
> source becomes active again (= added latency)
>
>
>
> Thias
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Tuesday, February 6, 2024 9:48 AM
> *To:* Schwalbe Matthias 
> *Cc:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
>
>
> thanks for looking at this. Would you then say this comment in the source
> code is not really valid?
>
>
> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181
>
>
>
> That's where the log I was looking at is created.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertr

Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

thanks for looking at this. Would you then say this comment in the source
code is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Idleness not working if watermark alignment is used

2024-02-05 Thread Alexis Sarda-Espinosa
Ah and I forgot to mention, this is with Flink 1.18.1

Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
> where maxAllowedWatermark grows all the time.
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
> Regards,
> Alexis.
>
>


Idleness not working if watermark alignment is used

2024-02-05 Thread Alexis Sarda-Espinosa
Hello,

I have 2 Kafka sources that are configured with a watermark strategy
instantiated like this:

WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
.withIdleness(idleTimeout) // 5 seconds currently
.withWatermarkAlignment(alignmentGroup,
maxAllowedWatermarkDrift, Duration.ofSeconds(1L))

The alignment group is the same for both, but each one consumes from a
different topic. During a test, I ensured that one of the topics didn't
receive any messages, but when I check the logs I see multiple entries like
this:

Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.

where maxAllowedWatermark grows all the time.

Maybe my understanding is wrong, but I think this means the source is never
marked as idle even though it didn't receive any new messages in the Kafka
topic?

Regards,
Alexis.


Watermark alignment with different allowed drifts

2024-02-05 Thread Alexis Sarda-Espinosa
Hello,

is the behavior for this configuration well defined? Assigning two
different (Kafka) sources to the same alignment group but configuring
different max allowed drift in each  one.

Regards,
Alexis.


Re: Flink KafkaProducer Failed Transaction Stalling the whole flow

2023-12-18 Thread Alexis Sarda-Espinosa
Hi Dominik,

Sounds like it could be this?
https://issues.apache.org/jira/browse/FLINK-28060

It doesn't mention transactions but I'd guess it could be the same
mechanism.

Regards,
Alexis.

On Mon, 18 Dec 2023, 07:51 Dominik Wosiński,  wrote:

> Hey,
> I've got a question regarding the transaction failures in EXACTLY_ONCE
> flow with Flink 1.15.3 with Confluent Cloud Kafka.
>
> The case is that there is a FlinkKafkaProducer in EXACTLY_ONCE setup with
> default *transaction.timeout.ms  *of
> 15min.
>
> During the processing the job had some issues that caused checkpoint to
> timeout, that in turn caused the transaction issues, which caused
> transaction to fail with the following logs:
> Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@5d0d5082)
> because its producer is already fenced. This means that you either have a
> different producer with the same 'transactional.id' (this is unlikely
> with the 'KafkaSink' as all generated ids are unique and shouldn't be
> reused) or recovery took longer than 'transaction.timeout.ms' (90ms).
> In both cases this most likely signals data loss, please consult the Flink
> documentation for more details.
> Up to this point everything is pretty clear. After that however, the job
> continued to work normally but every single transaction was failing with:
> Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@5a924600)
> because it's in an invalid state. Most likely the transaction has been
> aborted for some reason. Please check the Kafka logs for more details.
> Which effectively stalls all downstream processing because no transaction
> would be ever commited.
>
> I've read through the docs and understand that this is kind of a known
> issue due to the fact that Kafka doesn't effectively support 2PC, but why
> doesn't that cause the failure and restart of the whole job? Currently, the
> job will process everything normally and hides the issue until it has grown
> catastrophically.
>
> Thanks in advance,
> Cheers.
>


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-20 Thread Alexis Sarda-Espinosa
Hi Trystan, I'm actually not very familiar with the operator's internals,
but I'd guess that limitation is in Flink itself - application mode is a
feature from core Flink, the operator just configures it based on the CRDs
it defines. Maybe one of the maintainers can confirm.

Regards,
Alexis.

On Mon, 20 Nov 2023, 19:25 Trystan,  wrote:

> Thanks Alexis, I can give that a try. However, that seems less than ideal
> from the user's perspective.
>
> Is there a technical reason why the operator can't support this
> combination of modes? I'd really like to just let the system do its thing
> rather than build a complicated two-jar approach.
>
> Thanks,
> Trystan
>
> On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Trystan,
>>
>> I imagine you can create 2 jars, one should only have a class with the
>> main method, and the other should be a fat jar with everything else for
>> your job. If you create a custom image where your fat jar is placed under
>> /opt/flink/lib/ then I think it would "just work" when specifying the
>> main-method jar in jarURI.
>>
>> Nevertheless, even though Flink shadows a lot of the libraries they use
>> internally, I suppose you could still end up with dependency conflicts, so
>> you would probably have some added complexity managing what's bundled in
>> your fat jar.
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :
>>
>>> Is it possible to avoid dynamic classloading when using the operator
>>> with a native kubernetes application deployment?
>>>
>>> If I put the job jar into /opt/flinklib, then there are two possible
>>> outcomes:
>>>
>>>1. If I point jarURI to the jar, I get linkage errors (presumably:
>>>the class have already been loaded by the AppClassLoader and the
>>>FlinkUserCodeClassLoader).
>>>2. If I do not include jarURI the operator pods encounter a
>>>NullPointerException. The docs state this is optional, but appears to 
>>> only
>>>pertain to standalone mode.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
>>> jarURI (apparently only for standalone deployments).
>>>
>>> Are there any additional configurations (configs, jar locations, etc)
>>> that are needed to avoid dynamic classloading in this case?
>>>
>>


Re: Avoid dynamic classloading in native mode with Kubernetes Operator

2023-11-17 Thread Alexis Sarda-Espinosa
Hi Trystan,

I imagine you can create 2 jars, one should only have a class with the main
method, and the other should be a fat jar with everything else for your
job. If you create a custom image where your fat jar is placed under
/opt/flink/lib/ then I think it would "just work" when specifying the
main-method jar in jarURI.

Nevertheless, even though Flink shadows a lot of the libraries they use
internally, I suppose you could still end up with dependency conflicts, so
you would probably have some added complexity managing what's bundled in
your fat jar.

Regards,
Alexis.

Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan :

> Is it possible to avoid dynamic classloading when using the operator with
> a native kubernetes application deployment?
>
> If I put the job jar into /opt/flinklib, then there are two possible
> outcomes:
>
>1. If I point jarURI to the jar, I get linkage errors (presumably: the
>class have already been loaded by the AppClassLoader and the
>FlinkUserCodeClassLoader).
>2. If I do not include jarURI the operator pods encounter a
>NullPointerException. The docs state this is optional, but appears to only
>pertain to standalone mode.
>
> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional
> jarURI (apparently only for standalone deployments).
>
> Are there any additional configurations (configs, jar locations, etc) that
> are needed to avoid dynamic classloading in this case?
>


Re: dependency error with latest Kafka connector

2023-11-14 Thread Alexis Sarda-Espinosa
Isn't it expected that it points to 1.17? That version of the Kafka
connector is meant to be compatible with both Flink 1.17 and 1.18, right?
So the older version should be specified so that the consumer can decide
which Flink version to compile against, otherwise the build tool could
silently update the compile-only dependencies, no?

Regards,
Alexis.

Am Di., 14. Nov. 2023 um 11:54 Uhr schrieb Alexey Novakov via user <
user@flink.apache.org>:

> Hi Günterh,
>
> It looks like a problem with the Kafka connector release.
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.0.1-1.18
> Compile dependencies are still pointing to Flink 1.17.
>
> Release person is already contacted about this or will be contacted soon.
>
> Best regards,
> Alexey
>
> On Mon, Nov 13, 2023 at 10:42 PM guenterh.lists 
> wrote:
>
>> Hello
>>
>> I'm getting a dependency error when using the latest Kafka connector in
>> a Scala project.
>>
>> Using the 1.17.1 Kafka connector compilation is ok.
>>
>> With
>>
>> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
>>
>> I get
>> [error] (update) sbt.librarymanagement.ResolveException: Error
>> downloading org.apache.flink:flink-connector-base:
>> [error]   Not found
>> [error]   Not found
>> [error]   not found:
>>
>> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-base/ivys/ivy.xml
>> [error]   not found:
>>
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base//flink-connector-base-.pom
>>
>> Seems Maven packaging is not correct.
>>
>> My sbt build file:
>>
>> ThisBuild / scalaVersion := "3.3.0"
>> val flinkVersion = "1.18.0"
>> val postgresVersion = "42.2.2"
>>
>> lazy val root = (project in file(".")).settings(
>>name := "flink-scala-proj",
>>libraryDependencies ++= Seq(
>>  "org.flinkextended" %% "flink-scala-api" % "1.17.1_1.1.0",
>>  "org.apache.flink" % "flink-clients" % flinkVersion % Provided,
>>  "org.apache.flink" % "flink-connector-files" % flinkVersion %
>> Provided,
>>
>>"org.apache.flink" % "flink-connector-kafka" % "1.17.1",
>>//"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18",
>>
>>//"org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17",
>>//"org.postgresql" % "postgresql" % postgresVersion,
>>"org.apache.flink" % "flink-connector-files" % flinkVersion % Provided,
>>//"org.apache.flink" % "flink-connector-base" % flinkVersion % Provided
>>)
>> )
>>
>>
>>
>> Thanks!
>>
>> --
>> Günter Hipler
>> https://openbiblio.social/@vog61
>> https://twitter.com/vog61
>>
>>


Re: Continuous errors with Azure ABFSS

2023-11-10 Thread Alexis Sarda-Espinosa
After enabling some more logging for the storage account, I figured out the
errors correspond to 404 PathNotFound responses. My guess is the file
system checks the status of a path to see if it exists or not before
trying to write to it, in this case for _metadata files from each new
checkpoint. Seems like normal operations, so it's just unfortunate the
Azure API exposes that as continuous ClientOtherError metrics.

Regards,
Alexis.

Am Fr., 6. Okt. 2023 um 08:10 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Yes, that also works correctly, at least based on the Kafka source we use
> (we'd get an alert if it suddenly started consuming from a very old offset).
>
> Regards,
> Alexis.
>
> On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
> wrote:
>
>> Sorry for the late reply. Just in case you restart the job , is it able
>> to safely use the checkpoint and get back to the checkpointed state?
>>
>> Regards
>> Ram,
>>
>> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi Surendra,
>>>
>>> there are no exceptions in the logs, nor anything salient with
>>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>>> set the config
>>>
>>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>>> surendralilh...@gmail.com>:
>>>
>>>> Hi Alexis,
>>>>
>>>> Could you please check the TaskManager log for any exceptions?
>>>>
>>>> Thanks
>>>> Surendra
>>>>
>>>>
>>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I 
>>>>> see
>>>>> that every single operation contains failing transactions for the
>>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>>> failing, but I wonder if this could be an issue in the long term?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>


Re: Disable flink old checkpoint clean

2023-11-08 Thread Alexis Sarda-Espinosa
Hello,

maybe someone can correct me if I'm wrong, but reading through [1], it
seems to me that manually triggered checkpoints were meant for these
scenarios. If the implementation follows the ticket's description, a
user-triggered checkpoint would "break the chain of incremental
checkpoints", which would allow a safer activation of S3 TTL?

[1] https://issues.apache.org/jira/browse/FLINK-27101

Regards,
Alexis.

Am Mi., 8. Nov. 2023 um 06:51 Uhr schrieb Jinzhong Li <
lijinzhong2...@gmail.com>:

> Hi Yang,
>
> I think there is no configuration option available that allow users to
> disable checkpoint file cleanup at runtime.
>
> Does your flink application use incremental checkpoint?
> 1) If yes, i think leveraging S3's lifecycle management to clean
> checkpoint files is not safe, because it may accidentally delete a file
> which is still in use, although the probability is small.
> 2) If no, you can try to enable incremental checkpoint and increase the
> checkpoint interval to reduce the S3 traffic.
>
> Yang LI  于2023年11月8日周三 04:58写道:
>
>> Hi Martijn,
>>
>>
>> We're currently utilizing flink-s3-fs-presto. After reviewing the
>> flink-s3-fs-hadoop source code, I believe we would encounter similar issues
>> with it as well.
>>
>> When we say, 'The purpose of a checkpoint, in principle, is that Flink
>> manages its lifecycle,' I think it implies that the automatic cleanup of
>> old checkpoints is an integral part of Flink's lifecycle management.
>> However, is there a configuration option available that allows us to
>> disable this automatic cleanup? We're considering leveraging AWS S3's
>> lifecycle management capabilities to handle this aspect instead of relying
>> on Flink.
>>
>> Best,
>> Yang
>>
>> On Tue, 7 Nov 2023 at 18:44, Martijn Visser 
>> wrote:
>>
>>> Ah, I actually misread checkpoint and savepoints, sorry. The purpose
>>> of a checkpoint in principle is that Flink manages its lifecycle.
>>> Which S3 interface are you using for the checkpoint storage?
>>>
>>> On Tue, Nov 7, 2023 at 6:39 PM Martijn Visser 
>>> wrote:
>>> >
>>> > Hi Yang,
>>> >
>>> > If you use the NO_CLAIM mode, Flink will not assume ownership of a
>>> > snapshot and leave it up to the user to delete them. See the blog [1]
>>> > for more details.
>>> >
>>> > Best regards,
>>> >
>>> > Martijn
>>> >
>>> > [1]
>>> https://flink.apache.org/2022/05/06/improvements-to-flink-operations-snapshots-ownership-and-savepoint-formats/#no_claim-default-mode
>>> >
>>> > On Tue, Nov 7, 2023 at 5:29 PM Junrui Lee  wrote:
>>> > >
>>> > > Hi Yang,
>>> > >
>>> > >
>>> > > You can try configuring
>>> "execution.checkpointing.externalized-checkpoint-retention:
>>> RETAIN_ON_CANCELLATION"[1] and increasing the value of
>>> "state.checkpoints.num-retained"[2] to retain more checkpoints.
>>> > >
>>> > >
>>> > > Here are the official documentation links for more details:
>>> > >
>>> > > [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
>>> > >
>>> > > [2]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#state-checkpoints-num-retained
>>> > >
>>> > >
>>> > > Best,
>>> > >
>>> > > Junrui
>>> > >
>>> > >
>>> > > Yang LI  于2023年11月7日周二 22:02写道:
>>> > >>
>>> > >> Dear Flink Community,
>>> > >>
>>> > >> In our Flink application, we persist checkpoints to AWS S3.
>>> Recently, during periods of high job parallelism and traffic, we've
>>> experienced checkpoint failures. Upon investigating, it appears these may
>>> be related to S3 delete object requests interrupting checkpoint re-uploads,
>>> as evidenced by numerous InterruptedExceptions.
>>> > >>
>>> > >> We aim to explore options for disabling the deletion of stale
>>> checkpoints. Despite consulting the Flink configuration documentation and
>>> conducting various tests, the appropriate setting to prevent old checkpoint
>>> cleanup remains elusive.
>>> > >>
>>> > >> Could you advise if there's a method to disable the automatic
>>> cleanup of old Flink checkpoints?
>>> > >>
>>> > >> Best,
>>> > >> Yang
>>>
>>


Re: Updating existing state with state processor API

2023-10-27 Thread Alexis Sarda-Espinosa
Hi Matthias,

Thanks for the response. I guess the specific question would be, if I work
with an existing savepoint and pass an empty DataStream to
OperatorTransformation#bootstrapWith, will the new savepoint end up with an
empty state for the modified operator, or will it maintain the existing
state because nothing was changed?

Regards,
Alexis.

Am Fr., 27. Okt. 2023 um 08:40 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> Something like this we do all the time.
>
> Read and existing savepoint, copy some of the not to be changed operator
> states (keyed/non-keyed) over, and process/patch the remaining ones by
> transforming and bootstrapping to new state.
>
>
>
> I could spare more details for more specific questions, if you like 😊
>
>
>
> Regards
>
>
>
> Thias
>
>
>
> PS: I’m currently working on this ticket in order to get some glitches
> removed: FLINK-26585 <https://issues.apache.org/jira/browse/FLINK-26585>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, October 26, 2023 4:01 PM
> *To:* user 
> *Subject:* Updating existing state with state processor API
>
>
>
> Hello,
>
>
>
> The documentation of the state processor API has some examples to modify
> an existing savepoint by defining a StateBootstrapTransformation. In all
> cases, the entrypoint is OperatorTransformation#bootstrapWith, which
> expects a DataStream. If I pass an empty DataStream to bootstrapWith and
> then apply the resulting transformation to an existing savepoint, will the
> transformation still receive data from the existing state?
>
>
>
> If the aforementioned is incorrect, I imagine I could instantiate
> a SavepointReader and create a DataStream of the existing state with it,
> which I could then pass to the bootstrapWith method directly or after
> "unioning" it with additional state. Would this work?
>
>
>
> Regards,
>
> Alexis.
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Updating existing state with state processor API

2023-10-26 Thread Alexis Sarda-Espinosa
Hello,

The documentation of the state processor API has some examples to modify an
existing savepoint by defining a StateBootstrapTransformation. In all
cases, the entrypoint is OperatorTransformation#bootstrapWith, which
expects a DataStream. If I pass an empty DataStream to bootstrapWith and
then apply the resulting transformation to an existing savepoint, will the
transformation still receive data from the existing state?

If the aforementioned is incorrect, I imagine I could instantiate
a SavepointReader and create a DataStream of the existing state with it,
which I could then pass to the bootstrapWith method directly or after
"unioning" it with additional state. Would this work?

Regards,
Alexis.


Re: Continuous errors with Azure ABFSS

2023-10-05 Thread Alexis Sarda-Espinosa
Yes, that also works correctly, at least based on the Kafka source we use
(we'd get an alert if it suddenly started consuming from a very old offset).

Regards,
Alexis.

On Thu, 5 Oct 2023, 19:36 ramkrishna vasudevan, 
wrote:

> Sorry for the late reply. Just in case you restart the job , is it able to
> safely use the checkpoint and get back to the checkpointed state?
>
> Regards
> Ram,
>
> On Thu, Sep 28, 2023 at 4:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Surendra,
>>
>> there are no exceptions in the logs, nor anything salient with
>> INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
>> set the config
>>
>> execution.checkpointing.tolerable-failed-checkpoints: 1
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
>> surendralilh...@gmail.com>:
>>
>>> Hi Alexis,
>>>
>>> Could you please check the TaskManager log for any exceptions?
>>>
>>> Thanks
>>> Surendra
>>>
>>>
>>> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are using ABFSS for RocksDB's backend as well as the storage dir
>>>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>>>> that every single operation contains failing transactions for the
>>>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>>>> know the storage account is only used by Flink. Checkpointing isn't
>>>> failing, but I wonder if this could be an issue in the long term?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Surendra,

there are no exceptions in the logs, nor anything salient with
INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
set the config

execution.checkpointing.tolerable-failed-checkpoints: 1

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
surendralilh...@gmail.com>:

> Hi Alexis,
>
> Could you please check the TaskManager log for any exceptions?
>
> Thanks
> Surendra
>
>
> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Ram,

Thanks for that. We configure a path with ABFSS scheme in the following
settings:

- state.checkpoints.dir
- state.savepoints.dir
- high-availability.storageDir

We use RocksDB with incremental checkpointing every minute.

I found the metrics from Azure in the storage account under Monitoring,
Insights, Failures, scrolling down. I'll attach a screenshot here, although
I'm not sure that works well with the distribution list.

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 07:28 Uhr schrieb ramkrishna vasudevan <
ramvasu.fl...@gmail.com>:

> Can you help with more info here?
> The RocksDB backend itself is in ABFS instead of local? Or you mean the
> checkpoint is in ABFS but local dir for RocksDB is in local storage?
>
> GetPathSTatus is done by your monitoring pages? We run Flink on ABFS so we
> would like to see if we can help you out.
>
> Regards
> Ram
>
> On Thu, Sep 28, 2023 at 2:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Continuous errors with Azure ABFSS

2023-09-27 Thread Alexis Sarda-Espinosa
Hello,

We are using ABFSS for RocksDB's backend as well as the storage dir
required for Kubernetes HA. In the Azure Portal's monitoring insights I see
that every single operation contains failing transactions for the
GetPathStatus API. Unfortunately I don't see any additional details, but I
know the storage account is only used by Flink. Checkpointing isn't
failing, but I wonder if this could be an issue in the long term?

Regards,
Alexis.


Re: Side outputs documentation

2023-09-26 Thread Alexis Sarda-Espinosa
I see, sounds good, thanks for the clarification.

Am Di., 26. Sept. 2023 um 03:29 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> Thanks for the clarification. I found the second constructor on
> Flink's master branch here[1], and maybe it was that we had been
> commenting on different versions of Flink, and the second constructor
> has not been introduced in the version you use. From the source code I
> can see that the OutputTag need not be anonymous so long as the type
> extraction process passes, while making it anonymous guarantees the
> success of this step, so you are right that you need not bother about
> this matter so long as your tests and jobs can pass. Besides, I wonder
> whether being a static field influences the anonymity of a variable.
> To my understanding, making it anonymous means coding `new
> OutputTag("foobar"){}` instead of  `new
> OutputTag("foobar")`. It doesn't matter whether the prefix is
> `private OutputTag tag = new OutputTag("foobar"){}` or
> `private static OutputTag tag = new
> OutputTag("foobar"){}`. They should be independent from each
> other and OutputTag's document is correct from this aspect.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/OutputTag.java#L82
>
> Best,
> Yunfeng
>
> On Mon, Sep 25, 2023 at 10:57 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for the response. I hadn't even seen the other constructor, but
> it seems that the single-arg constructor works fine even if the output tag
> is declared as "static final", at least in my use case. I imagine Flink
> would complain about unknown types if it really can't figure it out
> automatically, so maybe I can just let it be as long as tests pass, but I
> was wondering if Flink really needs a non-static field to analyze type
> information here. Who knows, maybe there are some scenarios where it's
> really a must.
> >
> > Regards,
> > Alexis.
> >
> > Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>:
> >>
> >> Hi Alexis,
> >>
> >> If you create OutputTag with the constructor `OutputTag(String id)`,
> >> you need to make it anonymous for Flink to analyze the type
> >> information. But if you use the constructor `OutputTag(String id,
> >> TypeInformation typeInfo)`, you need not make it anonymous as you
> >> have provided the type information.
> >>
> >> The second constructor is introduced after the document and the first
> >> constructor, and I think the document might have been outdated and not
> >> match with OutputTag's current behavior. A ticket and PR could be
> >> added to fix the document. What do you think?
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
> >>  wrote:
> >> >
> >> > Hello,
> >> >
> >> > very quick question, the documentation for side outputs states that
> an OutputTag "needs to be an anonymous inner class, so that we can analyze
> the type" (this is written in a comment in the example). Is this really
> true? I've seen many examples where it's a static element and it seems to
> work fine.
> >> >
> >> > Regards,
> >> > Alexis.
> >> >
>


Re: Side outputs documentation

2023-09-25 Thread Alexis Sarda-Espinosa
Hi Yunfeng,

Thanks for the response. I hadn't even seen the other constructor, but it
seems that the single-arg constructor works fine even if the output tag is
declared as "static final", at least in my use case. I imagine Flink would
complain about unknown types if it really can't figure it out
automatically, so maybe I can just let it be as long as tests pass, but I
was wondering if Flink really needs a non-static field to analyze type
information here. Who knows, maybe there are some scenarios where it's
really a must.

Regards,
Alexis.

Am Mo., 25. Sept. 2023 um 05:17 Uhr schrieb Yunfeng Zhou <
flink.zhouyunf...@gmail.com>:

> Hi Alexis,
>
> If you create OutputTag with the constructor `OutputTag(String id)`,
> you need to make it anonymous for Flink to analyze the type
> information. But if you use the constructor `OutputTag(String id,
> TypeInformation typeInfo)`, you need not make it anonymous as you
> have provided the type information.
>
> The second constructor is introduced after the document and the first
> constructor, and I think the document might have been outdated and not
> match with OutputTag's current behavior. A ticket and PR could be
> added to fix the document. What do you think?
>
> Best,
> Yunfeng
>
> On Fri, Sep 22, 2023 at 4:55 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hello,
> >
> > very quick question, the documentation for side outputs states that an
> OutputTag "needs to be an anonymous inner class, so that we can analyze the
> type" (this is written in a comment in the example). Is this really true?
> I've seen many examples where it's a static element and it seems to work
> fine.
> >
> > Regards,
> > Alexis.
> >
>


Side outputs documentation

2023-09-22 Thread Alexis Sarda-Espinosa
Hello,

very quick question, the documentation for side outputs states that an
OutputTag "needs to be an anonymous inner class, so that we can analyze the
type" (this is written in a comment in the example). Is this really true?
I've seen many examples where it's a static element and it seems to work
fine.

Regards,
Alexis.


Re: Failure to restore from last completed checkpoint

2023-09-08 Thread Alexis Sarda-Espinosa
Hello,

Just a shot in the dark here, but could it be related to
https://issues.apache.org/jira/browse/FLINK-32241 ?

Such failures can cause many exceptions, but I think the ones you've
included aren't pointing to the root cause, so I'm not sure if that issue
applies to you.

Regards,
Alexis.

On Fri, 8 Sept 2023, 20:43 Jacqlyn Bender via user, 
wrote:

> Hi Yanfei,
>
> We were never able to restore from a checkpoint, we ended up restoring
> from a savepoint as fallback. Would those logs suggest we failed to take a
> checkpoint before the job manager restarted? Our observabillity monitors
> showed no failed checkpoints.
>
> Here is an exception that occurred before the failure to restore from the
> checkpoint:
>
> java.io.IOException: Cannot register Closeable, registry is already
> closed. Closing argument.
>
> at
> org.apache.flink.util.AbstractAutoCloseableRegistry.registerCloseable(AbstractAutoCloseableRegistry.java:89)
> ~[a-pipeline-name.jar:1.0]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:128)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
> ~[flink-dist-1.17.1-shopify-81a88f8.jar:1.17.1-shopify-81a88f8]
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
> ~[a-pipeline-name.jar:1.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
> ~[?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Thanks,
> Jacqlyn
>
> On Thu, Sep 7, 2023 at 7:42 PM Yanfei Lei  wrote:
>
>> Hey Jacqlyn,
>> According to the stack trace, it seems that there is a problem when
>> the checkpoint is triggered. Is this the problem after the restore?
>> would you like to share some logs related to restoring?
>>
>> Best,
>> Yanfei
>>
>> Jacqlyn Bender via user  于2023年9月8日周五 05:11写道:
>> >
>> > Hey folks,
>> >
>> >
>> > We experienced a pipeline failure where our job manager restarted and
>> we were for some reason unable to restore from our last successful
>> checkpoint. We had regularly completed checkpoints every 10 minutes up to
>> this failure and 0 failed checkpoints logged. Using Flink version 1.17.1.
>> >
>> >
>> > Wondering if anyone can shed light on what might have happened?
>> >
>> >
>> > Here's the error from our logs:
>> >
>> >
>> > Message: FATAL: Thread ‘Checkpoint Timer’ produced an uncaught
>> exception. Stopping the process...
>> >
>> >
>> > extendedStackTrace: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException: java.lang.NullPointerException
>> >
>> > at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:669)
>> ~[a-pipeline-name.jar:1.0]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> [?:?]
>> >
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>> >
>> > at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>> >
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>> >
>> > at java.lang.Thread.run(Thread.java:829) [?:?]
>> >
>> > Caused by: java.util.concurrent.CompletionException:
>> java.lang.NullPointerException
>> >
>> > at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932)
>> ~[?:?]
>> >
>> > at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>> ~[?:?]
>> >
>> > ... 7 more
>> >
>> > Caused by: java.lang.NullPointerException
>> >
>> > at
>> org.apache.flink.runtime.operators.coordi

Semantics of purging with global windows

2023-08-30 Thread Alexis Sarda-Espinosa
Hello,

According to the javadoc of TriggerResult.PURGE, "All elements in the
window are cleared and the window is discarded, without evaluating
the window function or emitting any elements."
However, I've noticed that using a GlobalWindow (with a custom trigger)
followed by an AggregateFunction will call the function's add() even when
the trigger result is PURGE.

It seems to me that this has been the behavior for a very long time:

https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53#diff-6d18531a35cddca6e5995c40c7a564fd711b998d567c4e167a401f76ca29a2bbR295-R299

Is that really necessary? I'm guessing that operator deals with all types
of windows, so I'm not sure how that affects other window types.

Regards,
Alexis.


Re: Question about serialization of java.util classes

2023-08-15 Thread Alexis Sarda-Espinosa
Check this answer: https://stackoverflow.com/a/64721838/5793905

You could then use, for example, something like: new
SetTypeInfo(Types.STRING) instead of Types.LIST(Types.STRING)

Am Di., 15. Aug. 2023 um 10:40 Uhr schrieb :

> Hello Alexis,
>
> Thank you for sharing the helper classes this but unfortunately I have no
> idea how to use these classes or how they might be able to help me. This is
> all very new to me and I honestly can't wrap my head around Flink's type
> information system.
>
> Best regards,
> Saleh.
>
> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> Hello,
>
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
> Flink can do about that. Here's an example of helper classes I've been
> using to support set serde in Flink POJOs, but note that it's hardcoded for
> LinkedHashSet, so you would have to create different implementations if you
> need to differentiate sorted sets:
>
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398
>
> Regards,
> Alexis.
>
>
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :
>
>> Hi,
>>
>> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
>> ```
>> package com.example;
>> import java.util.ArrayList;
>> import java.util.HashSet;
>> import java.util.TreeSet;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> public class App {
>> public static class Pojo {
>> public ArrayList list;
>> public HashSet set;
>> public TreeSet treeset;
>> public Pojo() {
>> this.list = new ArrayList<>();
>> this.set = new HashSet<>();
>> this.treeset = new TreeSet<>();
>> }
>> }
>> public static void main(String[] args) throws Exception {
>> var env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableGenericTypes();
>> env.fromElements(new Pojo()).print();
>> env.execute("Pipeline");
>> }
>> }
>> ```
>>
>> The result of running:
>> ```
>> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - class java.util.ArrayList does not contain a setter for field
>> size
>> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Class class java.util.ArrayList cannot be used as a POJO
>> type because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types
>> & Serialization" for details of the effect on performance and schema
>> evolution.
>> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#list will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.HashSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#set will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.TreeSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#sset will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core

Re: Question about serialization of java.util classes

2023-08-14 Thread Alexis Sarda-Espinosa
Hello,

AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
Flink can do about that. Here's an example of helper classes I've been
using to support set serde in Flink POJOs, but note that it's hardcoded for
LinkedHashSet, so you would have to create different implementations if you
need to differentiate sorted sets:

https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398

Regards,
Alexis.


Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :

> Hi,
>
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
> public static class Pojo {
> public ArrayList list;
> public HashSet set;
> public TreeSet treeset;
> public Pojo() {
> this.list = new ArrayList<>();
> this.set = new HashSet<>();
> this.treeset = new TreeSet<>();
> }
> }
> public static void main(String[] args) throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableGenericTypes();
> env.fromElements(new Pojo()).print();
> env.execute("Pipeline");
> }
> }
> ```
>
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Class class java.util.ArrayList cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types
> & Serialization" for details of the effect on performance and schema
> evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#list will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.HashSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#set will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.TreeSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#sset will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and
> type java.util.ArrayList is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>
> at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInte

Task manager creation in Flink native Kubernetes (application mode)

2023-07-25 Thread Alexis Sarda-Espinosa
Hi everyone,

>From its inception (at least AFAIK), application mode for native Kubernetes
has always created "unmanaged" pods for task managers. I would like to know
if there are any specific benefits to this, or if on the other hand there
are specific reasons not to use Kubernetes Deployments instead.

In my case, I ask for a very specific reason. With the current approach, it
is almost impossible to determine if a task manager crash was due to an OOM
kill, given that there isn't any kind of history for the unmanaged pods.

I could add that these TM pods also confuse Argo CD and their state is
always "progressing". That's not so critical, but I don't know if anyone
else finds that odd.

I would be happy to know what others think.

Regards,
Alexis.


Re: Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
I found out someone else reported this and found a workaround:
https://issues.apache.org/jira/browse/FLINK-32241

Am Mo., 10. Juli 2023 um 16:45 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi again,
>
> I have found out that this issue occurred in 3 different clusters, and 2
> of them could not recover after restarting pods, it seems state was
> completely corrupted afterwards and was thus lost. I had never seen this
> before 1.17.1, so it might be a newly introduced problem.
>
> Regards,
> Alexis.
>
>
> Am Mo., 10. Juli 2023 um 11:07 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hello,
>>
>> we have just experienced a weird issue in one of our Flink clusters which
>> might be difficult to reproduce, but I figured I would document it in case
>> some of you know what could have gone wrong. This cluster had been running
>> with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
>> ran fine for a few days, but suddenly all checkpoints started failing
>> (RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
>> in the logs right before the problem started, just afterwards:
>>
>> Jul 9, 2023 @
>> 18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Completed
>> checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
>> checkpointDuration=443 ms, finalizationTime=103 ms).
>> Jul 9, 2023 @
>> 18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Triggering
>> checkpoint 187399 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
>> 3d85035b76921c0a905f6c4fade06eca.
>> Jul 9, 2023 @
>> 18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
>> clientId=flink-enumerator-admin-client] Node 0 disconnected.
>> Jul 9, 2023 @
>> 18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Checkpoint
>> 187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
>> Jul 9, 2023 @
>> 18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager 
>> Failed
>> to trigger or complete checkpoint 187399 for job
>> 3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
>> Jul 9, 2023 @
>> 18:15:10.905 
>> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable asynchronous
>> part of checkpoint 187399 could not be completed.
>> Jul 9, 2023 @
>> 18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
>> Triggering
>> checkpoint 187400 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
>> 3d85035b76921c0a905f6c4fade06eca.
>> *Jul 9, 2023 @
>> 18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl 
>> Duplicated
>> registration under key
>> ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
>> endKeyGroup=119}-016511.sst of a new state:
>> ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
>> dataBytes=1334}. This might happen during the task failover if state
>> backend creates different states with the same key before and after the
>> failure. Discarding the OLD state and keeping the NEW one which is included
>> into a completed checkpoint
>>
>> This last line appeared multiple times, and after that all checkpoints
>> failed. At some point, this exception also started appearing:
>>
>> org.apache.flink.runtime.jobmaster.JobMaster Error while processing
>> AcknowledgeCheckpoint message
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> finalize the pending checkpoint 188392. Failure reason: Failure to finalize
>> checkpoint.
>> ...
>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>> FileSystem for scheme "file"
>>
>> In the Kubernetes HA CM I can see that the value under ".data.counter"
>> was still increasing, not sure if that's expected, but since we configured
>> only 1 allowable consecutive checkpoint failure, the job kept restarting
>> every 2 minutes. Trying to create savepoints also failed with
>> UnsupportedFileSystemException.
>>
>> Manually deleting the Flink cluster's pods was enough to bring it back to
>> a working state. Maybe the handling for this error is not working correctly?
>>
>> Regards,
>> Alexis.
>>
>


Re: Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
Hi again,

I have found out that this issue occurred in 3 different clusters, and 2 of
them could not recover after restarting pods, it seems state was completely
corrupted afterwards and was thus lost. I had never seen this before
1.17.1, so it might be a newly introduced problem.

Regards,
Alexis.


Am Mo., 10. Juli 2023 um 11:07 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hello,
>
> we have just experienced a weird issue in one of our Flink clusters which
> might be difficult to reproduce, but I figured I would document it in case
> some of you know what could have gone wrong. This cluster had been running
> with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
> ran fine for a few days, but suddenly all checkpoints started failing
> (RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
> in the logs right before the problem started, just afterwards:
>
> Jul 9, 2023 @
> 18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Completed
> checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
> checkpointDuration=443 ms, finalizationTime=103 ms).
> Jul 9, 2023 @
> 18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Triggering
> checkpoint 187399 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
> 3d85035b76921c0a905f6c4fade06eca.
> Jul 9, 2023 @
> 18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
> clientId=flink-enumerator-admin-client] Node 0 disconnected.
> Jul 9, 2023 @
> 18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Checkpoint
> 187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
> Jul 9, 2023 @
> 18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager 
> Failed
> to trigger or complete checkpoint 187399 for job
> 3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
> Jul 9, 2023 @
> 18:15:10.905 org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable 
> asynchronous
> part of checkpoint 187399 could not be completed.
> Jul 9, 2023 @
> 18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> Triggering
> checkpoint 187400 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
> 3d85035b76921c0a905f6c4fade06eca.
> *Jul 9, 2023 @
> 18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl Duplicated
> registration under key
> ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
> endKeyGroup=119}-016511.sst of a new state:
> ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
> dataBytes=1334}. This might happen during the task failover if state
> backend creates different states with the same key before and after the
> failure. Discarding the OLD state and keeping the NEW one which is included
> into a completed checkpoint
>
> This last line appeared multiple times, and after that all checkpoints
> failed. At some point, this exception also started appearing:
>
> org.apache.flink.runtime.jobmaster.JobMaster Error while processing
> AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> finalize the pending checkpoint 188392. Failure reason: Failure to finalize
> checkpoint.
> ...
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "file"
>
> In the Kubernetes HA CM I can see that the value under ".data.counter" was
> still increasing, not sure if that's expected, but since we configured only
> 1 allowable consecutive checkpoint failure, the job kept restarting every 2
> minutes. Trying to create savepoints also failed with
> UnsupportedFileSystemException.
>
> Manually deleting the Flink cluster's pods was enough to bring it back to
> a working state. Maybe the handling for this error is not working correctly?
>
> Regards,
> Alexis.
>


Checkpointing and savepoints can never complete after inconsistency

2023-07-10 Thread Alexis Sarda-Espinosa
Hello,

we have just experienced a weird issue in one of our Flink clusters which
might be difficult to reproduce, but I figured I would document it in case
some of you know what could have gone wrong. This cluster had been running
with Flink 1.16.1 for a long time and was recently updated to 1.17.1. It
ran fine for a few days, but suddenly all checkpoints started failing
(RocksDB + Azure ABFSS + Kubernetes HA). I don't see anything interesting
in the logs right before the problem started, just afterwards:

Jul 9, 2023 @
18:13:41.271 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Completed
checkpoint 187398 for job 3d85035b76921c0a905f6c4fade06eca (19891956 bytes,
checkpointDuration=443 ms, finalizationTime=103 ms).
Jul 9, 2023 @
18:14:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Triggering
checkpoint 187399 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919280725 for job
3d85035b76921c0a905f6c4fade06eca.
Jul 9, 2023 @
18:15:05.472 org.apache.kafka.clients.NetworkClient [AdminClient
clientId=flink-enumerator-admin-client] Node 0 disconnected.
Jul 9, 2023 @
18:15:10.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Checkpoint
187399 of job 3d85035b76921c0a905f6c4fade06eca expired before completing.
Jul 9, 2023 @
18:15:10.741 org.apache.flink.runtime.checkpoint.CheckpointFailureManager
Failed
to trigger or complete checkpoint 187399 for job
3d85035b76921c0a905f6c4fade06eca. (0 consecutive failed attempts so far)
Jul 9, 2023 @
18:15:10.905 org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable
asynchronous
part of checkpoint 187399 could not be completed.
Jul 9, 2023 @
18:15:40.740 org.apache.flink.runtime.checkpoint.CheckpointCoordinator
Triggering
checkpoint 187400 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1688919340725 for job
3d85035b76921c0a905f6c4fade06eca.
*Jul 9, 2023 @
18:15:40.957 org.apache.flink.runtime.state.SharedStateRegistryImpl Duplicated
registration under key
ec5b73d0-a04c-4574-b380-7981c7173d80-KeyGroupRange{startKeyGroup=60,
endKeyGroup=119}-016511.sst of a new state:
ByteStreamStateHandle{handleName='abfss://.../checkpoints/3d85035b76921c0a905f6c4fade06eca/shared/2eddc140-51c8-4575-899f-e70ca71f95be',
dataBytes=1334}. This might happen during the task failover if state
backend creates different states with the same key before and after the
failure. Discarding the OLD state and keeping the NEW one which is included
into a completed checkpoint

This last line appeared multiple times, and after that all checkpoints
failed. At some point, this exception also started appearing:

org.apache.flink.runtime.jobmaster.JobMaster Error while processing
AcknowledgeCheckpoint message
org.apache.flink.runtime.checkpoint.CheckpointException: Could not
finalize the pending checkpoint 188392. Failure reason: Failure to finalize
checkpoint.
...
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "file"

In the Kubernetes HA CM I can see that the value under ".data.counter" was
still increasing, not sure if that's expected, but since we configured only
1 allowable consecutive checkpoint failure, the job kept restarting every 2
minutes. Trying to create savepoints also failed with
UnsupportedFileSystemException.

Manually deleting the Flink cluster's pods was enough to bring it back to a
working state. Maybe the handling for this error is not working correctly?

Regards,
Alexis.


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
BTW, it seems I spoke too soon in my previous email. I left the job running
overnight with each source having its own alignment group to evaluate only
per-split alignment, and I can see that eventually some partitions never
resumed consumption and the consumer lag increased.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Martjin,
>
> thanks for the pointers. I think the issue I'm seeing is not caused by
> those because in my case the watermarks are not negative. Some more
> information from my setup in case it's relevant:
>
> - All Kafka topics have 6 partitions.
> - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
> parallelism=1.
>
> Regards,
> Alexis.
>
> Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi Alexis,
>>
>> There are a couple of recent Flink tickets on watermark alignment,
>> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
>> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
>> also applicable in your case?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> just for completeness, I don't see the problem if I assign a different
>>> alignment group to each source, i.e. using only split-level watermark
>>> alignment.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>>
>>>> Hi,
>>>> I have the same trouble. This is really a bug.
>>>> `shouldWaitForAlignment` needs to be another change.
>>>>
>>>> By the way, a source will be marked as idle, when the source has
>>>> waiting for alignment for a long time. Is this a bug?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>>>> 写道:
>>>>
>>>> Hello,
>>>>
>>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>>> strategies are defined like this:
>>>>
>>>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>>> .withIdleness(idleTimeout)
>>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>>> Duration.ofSeconds(1L))
>>>>
>>>> The max allowed drift is currently 5 seconds, and my sources have an
>>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>>
>>>> What I observe is that, when I restart the job, all sources publish
>>>> messages, but then 2 of them are marked as idle and never resume. I found
>>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>>> negative values:
>>>>
>>>> 2023-06-27 15:11:42,927 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>>> subTaskId=1
>>>> 2023-06-27 15:11:43,009 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,091 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,116 DEBUG
>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>> 07:12:55.807) from subTaskId=0
>>>> 2023-06-27 15:11:43,298 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>>> 2023-06-27 15:11:43,304 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,306 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>> 2023-06-27 15:11:43,486 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,489 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>> 2023-06-27 15:11:43,492 INFO
>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>
>>>> Does anyone know if I'm missing something or this is really a bug?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Kafka source with idleness and alignment stops consuming

2023-06-29 Thread Alexis Sarda-Espinosa
Hi Martjin,

thanks for the pointers. I think the issue I'm seeing is not caused by
those because in my case the watermarks are not negative. Some more
information from my setup in case it's relevant:

- All Kafka topics have 6 partitions.
- Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
parallelism=1.

Regards,
Alexis.

Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Alexis,
>
> There are a couple of recent Flink tickets on watermark alignment,
> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
> also applicable in your case?
>
> Best regards,
>
> Martijn
>
> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> just for completeness, I don't see the problem if I assign a different
>> alignment group to each source, i.e. using only split-level watermark
>> alignment.
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :
>>
>>> Hi,
>>> I have the same trouble. This is really a bug.
>>> `shouldWaitForAlignment` needs to be another change.
>>>
>>> By the way, a source will be marked as idle, when the source has waiting
>>> for alignment for a long time. Is this a bug?
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
>>> 写道:
>>>
>>> Hello,
>>>
>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>> strategies are defined like this:
>>>
>>> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>> .withIdleness(idleTimeout)
>>> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>> Duration.ofSeconds(1L))
>>>
>>> The max allowed drift is currently 5 seconds, and my sources have an
>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>
>>> What I observe is that, when I restart the job, all sources publish
>>> messages, but then 2 of them are marked as idle and never resume. I found
>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>> negative values:
>>>
>>> 2023-06-27 15:11:42,927 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
>>> subTaskId=1
>>> 2023-06-27 15:11:43,009 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,091 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,116 DEBUG
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>> 07:12:55.807) from subTaskId=0
>>> 2023-06-27 15:11:43,298 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>> 2023-06-27 15:11:43,304 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,306 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>> 2023-06-27 15:11:43,486 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,489 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>> 2023-06-27 15:11:43,492 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>
>>> Does anyone know if I'm missing something or this is really a bug?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Kafka source with idleness and alignment stops consuming

2023-06-28 Thread Alexis Sarda-Espinosa
Hello,

just for completeness, I don't see the problem if I assign a different
alignment group to each source, i.e. using only split-level watermark
alignment.

Regards,
Alexis.

Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui :

> Hi,
> I have the same trouble. This is really a bug.
> `shouldWaitForAlignment` needs to be another change.
>
> By the way, a source will be marked as idle, when the source has waiting
> for alignment for a long time. Is this a bug?
>
>
>
>
>
>
> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" 
> 写道:
>
> Hello,
>
> I am currently evaluating idleness and alignment with Flink 1.17.1 and the
> externalized Kafka connector. My job has 3 sources whose watermark
> strategies are defined like this:
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout)
> .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
> Duration.ofSeconds(1L))
>
> The max allowed drift is currently 5 seconds, and my sources have an
> idleTimeout of 1, 1.5, and 5 seconds.
>
> What I observe is that, when I restart the job, all sources publish
> messages, but then 2 of them are marked as idle and never resume. I found
> https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed
> in 1.17.1, but I don't think it's the same issue, my logs don't show
> negative values:
>
> 2023-06-27 15:11:42,927 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
> subTaskId=1
> 2023-06-27 15:11:43,009 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,091 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,116 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
> 07:12:55.807) from subTaskId=0
> 2023-06-27 15:11:43,298 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
> 2023-06-27 15:11:43,304 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
> 2023-06-27 15:11:43,306 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
> 2023-06-27 15:11:43,486 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
> 2023-06-27 15:11:43,489 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
> 2023-06-27 15:11:43,492 INFO
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>
> Does anyone know if I'm missing something or this is really a bug?
>
> Regards,
> Alexis.
>
>


Kafka source with idleness and alignment stops consuming

2023-06-27 Thread Alexis Sarda-Espinosa
Hello,

I am currently evaluating idleness and alignment with Flink 1.17.1 and the
externalized Kafka connector. My job has 3 sources whose watermark
strategies are defined like this:

WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
.withIdleness(idleTimeout)
.withWatermarkAlignment("group", maxAllowedWatermarkDrift,
Duration.ofSeconds(1L))

The max allowed drift is currently 5 seconds, and my sources have an
idleTimeout of 1, 1.5, and 5 seconds.

What I observe is that, when I restart the job, all sources publish
messages, but then 2 of them are marked as idle and never resume. I found
https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed in
1.17.1, but I don't think it's the same issue, my logs don't show negative
values:

2023-06-27 15:11:42,927 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from
subTaskId=1
2023-06-27 15:11:43,009 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,091 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,116 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
07:12:55.807) from subTaskId=0
2023-06-27 15:11:43,298 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
2023-06-27 15:11:43,304 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,306 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
2023-06-27 15:11:43,486 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,489 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
2023-06-27 15:11:43,492 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]

Does anyone know if I'm missing something or this is really a bug?

Regards,
Alexis.


Re: Interaction between idling sources and watermark alignment

2023-06-16 Thread Alexis Sarda-Espinosa
Thank you very much for the explanation, Hong.

On Thu, 15 Jun 2023, 15:55 Teoh, Hong,  wrote:

> Hi Alexis, below is my understanding:
>
>
> > I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> As far as I understand, the evaluation to “unpause” a given split that
> might have been paused due to watermark alignment is evaluated at fixed
> intervals here. [1]
>
> We see that the SourceCoordinator calls announceCombinedWatermark() that
> calculates the global watermark and that subsequently sends
> a WatermarkAlignmentEvent to each subtask. On each subtask, there is an
> evaluation of whether to “wake up” the operator. [2] [3]
>
> This means that there is a periodic evaluation of whether to “wake up”,
> controlled by the update interval, which defaults to 1s [4]
>
> > Also, something that isn't 100% clear to me when comparing to the
> previous watermark alignment documentation, even if I only wanted alignment
> within a single source's splits, I still need to
> call withWatermarkAlignment in the watermark strategy, right? Otherwise
> alignment will not take place, regardless
> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Yes, this is correct. Watermark groups are used to check whether multiple
> sources need to coordinate watermarks. If two sources A and B both belong
> to the same watermark group, then their watermarks will be aligned.
>
> Hope the above helps.
>
>
> Cheers,
> Hong
>
>
> [1]
> https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
> [3]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
> [4]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29
>
>
>
> On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa 
> wrote:
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
> Hi again, I'm not a fan of bumping questions, but I think this might be
> relevant, maybe enough to include it in the official documentation?
>
> Regards,
> Alexis.
>
> On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I see that, in Flink 1.17.1, watermark alignment will be supported (as
>> beta) within a single source's splits and across different sources. I don't
>> see this explicitly mentioned in the documentation, but I assume that the
>> concept of "maximal drift" used for alignment also takes idleness into
>> account, resuming any readers that were paused due to an idle split or
>> source. Is my understanding correct?
>>
>> Also, something that isn't 100% clear to me when comparing to the
>> previous watermark alignment documentation, even if I only wanted alignment
>> within a single source's splits, I still need to
>> call withWatermarkAlignment in the watermark strategy, right? Otherwise
>> alignment will not take place, regardless
>> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>>
>> Regards,
>> Alexis.
>>
>>
>


Re: Interaction between idling sources and watermark alignment

2023-06-13 Thread Alexis Sarda-Espinosa
Hi again, I'm not a fan of bumping questions, but I think this might be
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, 
wrote:

> Hello,
>
> I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> Also, something that isn't 100% clear to me when comparing to the previous
> watermark alignment documentation, even if I only wanted alignment within a
> single source's splits, I still need to call withWatermarkAlignment in the
> watermark strategy, right? Otherwise alignment will not take place,
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Regards,
> Alexis.
>
>


Re: RocksDB segfault on state restore

2023-06-01 Thread Alexis Sarda-Espinosa
Hello,

A couple of potentially relevant pieces of information:

1. https://issues.apache.org/jira/browse/FLINK-16686
2. https://stackoverflow.com/a/64721838/5793905 (question was about schema
evolution, but the answer is more generally applicable)

Regards,
Alexis.

Am Fr., 2. Juni 2023 um 07:18 Uhr schrieb Gyula Fóra :

> Hi!
>
>
> In our case, no schema evolution was triggered , only the TTL was set from
> the beginning as far as I remember.
>
> I will double check
>
> Gyula
>
> On Fri, 2 Jun 2023 at 06:12, Hangxiang Yu  wrote:
>
>> Hi, Gyula.
>> It seems related to https://issues.apache.org/jira/browse/FLINK-23346.
>> We also saw core dump while using list state after triggering state
>> migration and ttl compaction filter. Have you triggered the schema
>> evolution ?
>> It seems a bug of the rocksdb list state together with ttl compaction
>> filter.
>>
>> On Wed, May 17, 2023 at 7:05 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> We are encountering an error on a larger stateful job (around 1 TB +
>>> state) on restore from a rocksdb checkpoint. The taskmanagers keep crashing
>>> with a segfault coming from the rocksdb native logic and seem to be related
>>> to the FlinkCompactionFilter mechanism.
>>>
>>> The gist with the full error report:  report:
>>> https://gist.github.com/gyfora/f307aa570d324d063e0ade9810f8bb25
>>>
>>> The core part is here:
>>> V  [libjvm.so+0x79478f]  Exceptions::
>>> (Thread*, char const*, int, oopDesc*)+0x15f
>>> V  [libjvm.so+0x960a68]  jni_Throw+0x88
>>> C  [librocksdbjni-linux64.so+0x222aa1]
>>>  JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long,
>>> long) const+0x121
>>> C  [librocksdbjni-linux64.so+0x6486c1]
>>>  rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&,
>>> std::string*) const+0x81
>>> C  [librocksdbjni-linux64.so+0x648bea]
>>>  rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice
>>> const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&,
>>> std::string*, std::string*) const+0x14a
>>>
>>> Has anyone encountered a similar issue before?
>>>
>>> Thanks
>>> Gyula
>>>
>>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Interaction between idling sources and watermark alignment

2023-05-30 Thread Alexis Sarda-Espinosa
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as
beta) within a single source's splits and across different sources. I don't
see this explicitly mentioned in the documentation, but I assume that the
concept of "maximal drift" used for alignment also takes idleness into
account, resuming any readers that were paused due to an idle split or
source. Is my understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous
watermark alignment documentation, even if I only wanted alignment within a
single source's splits, I still need to call withWatermarkAlignment in the
watermark strategy, right? Otherwise alignment will not take place,
regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.


Kubernetes operator stops responding due to Connection reset by peer

2023-04-21 Thread Alexis Sarda-Espinosa
Hello,

Today, we received an alert because the operator appeared to be down. Upon
further investigation, we realized the alert was triggered because the
endpoint for Prometheus metrics (which we enabled) stopped responding, so
it seems the endpoint used for the liveness probe wasn't affected and the
pod was not restarted automatically.

The logs right before the problem started don't show anything odd, and once
the problem started, the logs were spammed with warning messages stating
"Connection reset by peer" with no further information. From what I can
see, nothing else was logged during that time, so it looks like the process
really had stalled.

I imagine this is not easy to reproduce and, while a pod restart was enough
to get back on track, it might be worth improving the liveness probe to
catch these situations.

Full stacktrace for reference:

An exceptionCaught() event was fired, and it reached at the tail of the
pipeline. It usually means the last handler in the pipeline did not handle
the exception.
java.io.IOException: Connection reset by peer at
java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) at
java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source) at
java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source) at
java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source) at
org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledDirectByteBuf.setBytes(UnpooledDirectByteBuf.java:570)
at
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)

Regards,
Alexis.


Requirements for POJO serialization

2023-04-11 Thread Alexis Sarda-Espinosa
Hello,

according to the documentation, a POJO must have a no-arg constructor and
either public fields or public getters and setters with conventional
naming. I recently realized that if I create an explicit TypeInfoFactory
that provides Types.POJO and all other required details, the getters and
setters aren't needed. Is this an official feature?

I ask because this means some classes could have an "immutable contract",
so to speak. I'm guessing final fields might still be unsupported, but I
haven't validated.

Regards,
Alexis.


Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-30 Thread Alexis Sarda-Espinosa
Hi Martijn,

just to be sure, if all state-related classes use a POJO serializer, Kryo
will never come into play, right? Given FLINK-16686 [1], I wonder how many
users actually have jobs with Kryo and RocksDB, but even if there aren't
many, that still leaves those who don't use RocksDB for
checkpoints/savepoints.

If Kryo were to stay in the Flink APIs in v1.X, is it impossible to let
users choose between v2/v5 jars by separating them like log4j2 jars?

[1] https://issues.apache.org/jira/browse/FLINK-16686

Regards,
Alexis.

Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi all,
>
> I also saw a thread on this topic from Clayton Wohl [1] on this topic,
> which I'm including in this discussion thread to avoid that it gets lost.
>
> From my perspective, there's two main ways to get to Java 17:
>
> 1. The Flink community agrees that we upgrade Kryo to a later version,
> which means breaking all checkpoint/savepoint compatibility and releasing a
> Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support
> dropped. This is probably the quickest way, but would still mean that we
> expose Kryo in the Flink APIs, which is the main reason why we haven't been
> able to upgrade Kryo at all.
> 2. There's a contributor who makes a contribution that bumps Kryo, but
> either a) automagically reads in all old checkpoints/savepoints in using
> Kryo v2 and writes them to new snapshots using Kryo v5 (like is mentioned
> in the Kryo migration guide [2][3] or b) provides an offline tool that
> allows users that are interested in migrating their snapshots manually
> before starting from a newer version. That potentially could prevent the
> need to introduce a new Flink major version. In both scenarios, ideally the
> contributor would also help with avoiding the exposure of Kryo so that we
> will be in a better shape in the future.
>
> It would be good to get the opinion of the community for either of these
> two options, or potentially for another one that I haven't mentioned. If it
> appears that there's an overall agreement on the direction, I would propose
> that a FLIP gets created which describes the entire process.
>
> Looking forward to the thoughts of others, including the Users (therefore
> including the User ML).
>
> Best regards,
>
> Martijn
>
> [1]  https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2
> [2] https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq
> [3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5
>
> On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi 
> wrote:
>
>> I agree, there are several options to mitigate the migration from v2 to
>> v5.
>> yet, Oracle roadmap is to end JDK 11 support in September this year.
>>
>>
>>
>> 
>> From: ConradJam 
>> Sent: Thursday, March 16, 2023 4:36 AM
>> To: d...@flink.apache.org 
>> Subject: Re: [Discussion] - Release major Flink version to support JDK 17
>> (LTS)
>>
>> EXTERNAL EMAIL
>>
>>
>>
>> Thanks for your start this discuss
>>
>>
>> I have been tracking this problem for a long time, until I saw a
>> conversation in ISSUSE a few days ago and learned that the Kryo version
>> problem will affect the JDK17 compilation of snapshots [1] FLINK-24998 ,
>>
>> As @cherry said it ruined our whole effort towards JDK17
>>
>> I am in favor of providing an external tool to migrate from Kryo old
>> version checkpoint to the new Kryo new checkpoint at one time (Maybe this
>> tool start in flink 2.0 ?), does this tool currently have any plans or
>> ideas worth discuss
>>
>>
>> I think it should not be difficult to be compatible with JDK11 and JDK17.
>> We should indeed abandon JDK8 in 2.0.0. It is also mentioned in the doc
>> that it is marked as Deprecated [2]
>>
>>
>> Here I add that we need to pay attention to the version of Scala and the
>> version of JDK17
>>
>>
>> [1] FLINK-24998  IGSEGV in Kryo / C2 CompilerThread on Java 17
>> https://issues.apache.org/jira/browse/FLINK-24998
>>
>> [2] FLINK-30501 Update Flink build instruction to deprecate Java 8 instead
>> of requiring Java 11  https://issues.apache.org/jira/browse/FLINK-30501
>>
>> Tamir Sagi  于2023年3月16日周四 00:54写道:
>>
>> > Hey dev community,
>> >
>> > I'm writing this email to kick off a discussion following this epic:
>> > FLINK-15736.
>> >
>> > We are moving towards JDK 17 (LTS) , the only blocker now is Flink which
>> > currently remains on JDK 11 (LTS). Flink does not support JDK 17 yet,
>> with
>> > no timeline,  the reason, based on the aforementioned ticket is the
>> > following tickets
>> >
>> >   1.  FLINK-24998 - SIGSEGV in Kryo / C2 CompilerThread on Java 17<
>> > https://issues.apache.org/jira/browse/FLINK-24998>.
>> >   2.  FLINK-3154 - Update Kryo version from 2.24.0 to latest Kryo LTS
>> > version
>> >
>> > My question is whether it is possible to release a major version (Flink

Re: Watermarks lagging behind events that generate them

2023-03-15 Thread Alexis Sarda-Espinosa
Hi Shammon, thanks for the info. I was hoping the savepoint would include
the watermark, but I'm not sure that would make sense in every scenario.

Regards,
Alexis.

Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY :

> Hi Alexis
>
> In some watermark generators such as BoundedOutOfOrderTimestamps,
> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
> is restarted and no event from source is processed.
>
> Best,
> Shammon FY
>
> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi David, thanks for the answer. One follow-up question: will the
>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>> savepoint?
>>
>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>> dander...@apache.org>:
>>
>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>> they were designed that way, but that is how they are implemented.
>>> Windows maintain this contract by emitting all of their results before
>>> forwarding the watermark that triggered the results.
>>>
>>> David
>>>
>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
>>> >
>>> > Hi Alexis
>>> >
>>> > Do you use both event-time watermark generator and TimerService for
>>> processing time in your job? Maybe you can try using event-time watermark
>>> first.
>>> >
>>> > Best,
>>> > Shammon.FY
>>> >
>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I recently ran into a weird issue with a streaming job in Flink
>>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>>> processing time timers. I now want to execute the same job based on a
>>> historical data dump, so I had to adjust the logic to use event time timers
>>> in that case (and did not use BATCH execution mode). Since my data has a
>>> timestamp field, I implemented a custom WatermarkGenerator that always
>>> emits a watermark with that timestamp in the onEvent callback, and does
>>> nothing in the onPeriodicEmit callback.
>>> >>
>>> >> My problem is that, sometimes, the very first time my function calls
>>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>>> some false triggers when the first watermark actually arrives.
>>> >>
>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>>> watermark, it would be sent before the corresponding event, but maybe this
>>> is not always the case?
>>> >>
>>> >> In case it's relevant, a brief overview of my job's topology:
>>> >>
>>> >> Source1 -> Broadcast
>>> >>
>>> >> Source2 ->
>>> >>   keyBy ->
>>> >>   connect(Broadcast) ->
>>> >>   process ->
>>> >>   filter ->
>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>> >>   keyBy ->
>>> >>   process // function that uses timers
>>> >>
>>> >> Regards,
>>> >> Alexis.
>>>
>>


Re: Watermarks lagging behind events that generate them

2023-03-14 Thread Alexis Sarda-Espinosa
Hi David, thanks for the answer. One follow-up question: will the watermark
be reset to Long.MIN_VALUE every time I restart a job with savepoint?

Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
dander...@apache.org>:

> Watermarks always follow the corresponding event(s). I'm not sure why
> they were designed that way, but that is how they are implemented.
> Windows maintain this contract by emitting all of their results before
> forwarding the watermark that triggered the results.
>
> David
>
> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY  wrote:
> >
> > Hi Alexis
> >
> > Do you use both event-time watermark generator and TimerService for
> processing time in your job? Maybe you can try using event-time watermark
> first.
> >
> > Best,
> > Shammon.FY
> >
> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
> >>
> >> Hello,
> >>
> >> I recently ran into a weird issue with a streaming job in Flink 1.16.1.
> One of my functions (KeyedProcessFunction) has been using processing time
> timers. I now want to execute the same job based on a historical data dump,
> so I had to adjust the logic to use event time timers in that case (and did
> not use BATCH execution mode). Since my data has a timestamp field, I
> implemented a custom WatermarkGenerator that always emits a watermark with
> that timestamp in the onEvent callback, and does nothing in the
> onPeriodicEmit callback.
> >>
> >> My problem is that, sometimes, the very first time my function calls
> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
> some false triggers when the first watermark actually arrives.
> >>
> >> I would have expected that, if WatermarkGenerator.onEvent emits a
> watermark, it would be sent before the corresponding event, but maybe this
> is not always the case?
> >>
> >> In case it's relevant, a brief overview of my job's topology:
> >>
> >> Source1 -> Broadcast
> >>
> >> Source2 ->
> >>   keyBy ->
> >>   connect(Broadcast) ->
> >>   process ->
> >>   filter ->
> >>   assignTimestampsAndWatermarks -> // newly added for historical data
> >>   keyBy ->
> >>   process // function that uses timers
> >>
> >> Regards,
> >> Alexis.
>


Watermarks lagging behind events that generate them

2023-03-10 Thread Alexis Sarda-Espinosa
Hello,

I recently ran into a weird issue with a streaming job in Flink 1.16.1. One
of my functions (KeyedProcessFunction) has been using processing time
timers. I now want to execute the same job based on a historical data dump,
so I had to adjust the logic to use event time timers in that case (and did
*not* use BATCH execution mode). Since my data has a timestamp field, I
implemented a custom WatermarkGenerator that always emits a watermark with
that timestamp in the onEvent callback, and does nothing in the
onPeriodicEmit callback.

My problem is that, sometimes, the very first time my function calls
TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
some false triggers when the first watermark actually arrives.

I would have expected that, if WatermarkGenerator.onEvent emits a
watermark, it would be sent before the corresponding event, but maybe this
is not always the case?

In case it's relevant, a brief overview of my job's topology:

Source1 -> Broadcast

Source2 ->
  keyBy ->
  connect(Broadcast) ->
  process ->
  filter ->
  assignTimestampsAndWatermarks -> // newly added for historical data
  keyBy ->
  process // function that uses timers

Regards,
Alexis.


Re: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-06 Thread Alexis Sarda-Espinosa
Hello,

I actually needed this myself so I have validated it. Again, this is if you
want Flink itself to access Azure, and I'm fairly certain you have to use
Java because the plugin's class loader won't have access to the Scala
library's jars.

* You have to build against
https://mvnrepository.com/artifact/org.apache.flink/flink-azure-fs-hadoop/1.16.1
(mark it as provided).
* You should implemente the azurebfs provider for ABFS.
* You can create 1 plugin folder and copy Flink's azure jar plus the one
with your interface implementation.

I can confirm that worked.

Regards,
Alexis.

On Tue, 7 Mar 2023, 06:47 Swathi C,  wrote:

> Hi Ivan,
>
> You can try to setup using MSI so that the flink pods access the storage
> account and you might need to add the podIdentity to the flink pod so that
> it can access it. ( MSI should have the access for the storage account as
> well )
> The pod identity will have the required permissions to access to the
> storage account. These changes might be required along with adding the
> plugins.
> Can you try adding the following to the flink-config ?
>
> fs.azure.account.auth.type: OAuth
> fs.azure.account.oauth2.msi.tenant: 
> fs.azure.account.oauth.provider.type:
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
> fs.azure.account.oauth2.client.id: 
> fs.azure.identity.transformer.service.principal.substitution.list: '*'
> fs.azure.identity.transformer.service.principal.id: 
>
> Regards,
> Swathi C
>
> On Tue, Mar 7, 2023 at 8:53 AM Ivan Webber via user 
> wrote:
>
>> Thanks for the pointers Alexis!
>>
>>
>>
>> Implementing `org.apache.hadoop.fs.azure.KeyProvider` has helped me make
>> progress, but I’m running into a new error:
>>
>> ```
>>
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException:
>> org.tryflink.wrappers.TrafficForecastEnvKeyProviderWrapper specified in
>> config is not a valid KeyProvider class.
>>
>> ```
>>
>>
>>
>> I get this error whether I implement the class in Scala or Java, or use `
>> org.apache.hadoop.fs.azure.KeyProvider` or `
>> org.apache.hadoop.fs.azurebfs.services.KeyProvider `. My best guess is that
>> it’s something to do with not building against the shaded interface which
>> you indicated I should do or possibly different class loaders. To build
>> against the shaded interfaces would I import a package that has them?
>>
>>
>>
>> This is the dependency I added with
>> `org.apache.hadoop.fs.azure.KeyProvider`.
>>
>> ```
>>
>> 
>>
>> org.apache.hadoop
>>
>> hadoop-azure
>>
>> 3.3.2
>>
>> 
>>
>> ```
>>
>>
>>
>> What I’ve learned so far is that this configuration has more to do with
>> configuring Hadoop than Flink as the configuration is forwarded
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/#:~:text=Flink%20forwards%20all%20Flink%20configurations%20with%20a%20key%20prefix%20of%20fs.azure%20to%20the%20Hadoop%20configuration%20of%20the%20filesystem>.
>> Thus, I tried setting the properties to use Azure Managed Identity
>> <https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Azure_Managed_Identity>,
>> but got an error [1]. If anyone has gotten that to work I’d be interested
>> in hearing about it.
>>
>>
>>
>> Thanks for the help so far; please, anyone who can give pointers send
>> them.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Ivan
>>
>>
>>
>>
>>
>> [1] -
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException:
>> No credentials found for account myblob.blob.core.windows.net in the
>> configuration, and its container flink-forecast is not accessible using
>> anonymous credentials. Please check if the container exists first. If it is
>> not publicly available, you have to provide account credentials.
>>
>>
>>
>> *From: *Ivan Webber 
>> *Sent: *Friday, March 3, 2023 10:38 AM
>> *To: *Alexis Sarda-Espinosa 
>> *Cc: *user 
>> *Subject: *Re: [EXTERNAL] Re: Secure Azure Credential Configuration
>>
>>
>>
>> Thanks Alexis,
>>
>>
>>
>> I will be trying that out today. If it works I will share back and try
>> adding it to the docs.
>>
>>
>>
>>
>>
>> *From:* Alexis Sarda-Espinosa 
>> *Sent:* Thursday, March 2, 2023 3:33:03 PM
>> *To:* Ivan Webber 
&g

Re: [EXTERNAL] Re: Secure Azure Credential Configuration

2023-03-02 Thread Alexis Sarda-Espinosa
Hi Ivan,

please always include the whole distribution list since answers may help
others as well.

I would also think about implementing your own provider(s), but some things
I know:

- There are 2 different KeyProvider interfaces (which isn't explicitly
documented from what I can tell):
  * org.apache.hadoop.fs.azure.KeyProvider - WASB
  * org.apache.hadoop.fs.azurebfs.services.KeyProvider - ABFS (I think)
- Flink shades the hadoop classes
under org.apache.flink.fs.shaded.hadoop3... so you would need to implement
your providers against the shaded interfaces.
- The documentation for Flink plugins [1] shows an s3 folder with multiple
jars, so I imagine you could add a jar with your key providers to a folder
with the azure-fs jar, but I've never tested this.

However, I believe this whole shading and plugin details are only relevant
if you want Flink to access the azure FS for its checkpoints and/or
savepoints, if you need to access the FS directly in your code, I imagine
you're better off including the relevant hadoop jars in your fat jar
without going through Flink's plugin system.

This is my impression, but maybe someone else can correct me if I'm wrong.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/

Regards,
Alexis.

Am Do., 2. März 2023 um 23:46 Uhr schrieb Ivan Webber <
ivan.web...@microsoft.com>:

> Hello Alexis,
>
>
>
> I was actually thinking I’d use both WASB and ABFS, but I looked at the
> source for EnvironmentVariableKeyProvider and it only reads a single
> specific environment variable where my pipeline actually needs to bring
> together data stored in different blob and ADLS accounts. I couldn’t find
> anything about providing my own KeyProvider but I considered trying it as
> an experiment at one point.
>
>
>
> *From: *Alexis Sarda-Espinosa 
> *Sent: *Thursday, March 2, 2023 2:38 PM
> *To: *Ivan Webber 
> *Cc: *user 
> *Subject: *[EXTERNAL] Re: Secure Azure Credential Configuration
>
>
>
> You don't often get email from sarda.espin...@gmail.com. Learn why this
> is important <https://aka.ms/LearnAboutSenderIdentification>
>
> Hi Ivan,
>
>
>
> Mercy is always free. Are you using WASB or ABFS? I presume it's the
> latter, since that's the one that can't use EnvironmentVariableKeyProvider,
> but just to be sure.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> On Thu, 2 Mar 2023, 23:07 Ivan Webber via user, 
> wrote:
>
> TLDR: I will buy your coffee if you can help me understand to securely
> configure Azure credentials (doc page
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Ffilesystems%2Fazure%2F&data=05%7C01%7CIvan.Webber%40microsoft.com%7C6d883e0d338a47fccc9b08db1b6ebffd%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133934840820056%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000%7C%7C%7C&sdata=OeNfcC41XyzCtMsZlKT7vd%2FzcKNE8hp4n2MMmKZQUeI%3D&reserved=0>
> for reference).
>
>
>
> I am a junior developer tasked with being the first person to learn the
> Apache Flink framework. I know that storing secrets in flink-conf.yaml in a
> container is a bad idea. I’ve tried exposing Azure storage keys as env vars
> and using `config.setString`, but those properties seem to get overridden.
> I plan on using Flink operator, so if you can show me in that context
> that’d be ideal.
>
>
>
> Thanks, and sorry for bothering everyone. I’ve just exhausted myself and
> am hopeful someone will have mercy for me. I really will Venmo you $5 for
> coffee if you want.
>
>
>
> Thanks,
>
>
>
> Ivan
>
>
>
>
>
> Larger code examples:
>
>
>
> Setting dynamic properties before executing the job doesn’t work because
> the values seem to get overridden or never forwarded.
>
> ```
>
> val config = new Configuration()
>
> config.setString("fs.azure.account.key.mystore1.blob.core.windows.net
> <https://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffs.azure.account.key.mystore1.blob.core.windows.net%2F&data=05%7C01%7CIvan.Webber%40microsoft.com%7C6d883e0d338a47fccc9b08db1b6ebffd%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638133934840820056%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000%7C%7C%7C&sdata=fgakMbTg5vFKCFRMFs1OlyfD0RIDLCMuUYB%2BlFhk7AQ%3D&reserved=0>
> ", System.getenv("KEY_1"))
>
> config.setString("fs.azure.account.key.mystore2.blob.core.windows.net
> <https://nam06.safelinks.protection.outlook.com/?url=http%3A%2F%2Ffs.azure.account.key.mystor

Re: Secure Azure Credential Configuration

2023-03-02 Thread Alexis Sarda-Espinosa
Hi Ivan,

Mercy is always free. Are you using WASB or ABFS? I presume it's the
latter, since that's the one that can't use EnvironmentVariableKeyProvider,
but just to be sure.

Regards,
Alexis.


On Thu, 2 Mar 2023, 23:07 Ivan Webber via user, 
wrote:

> TLDR: I will buy your coffee if you can help me understand to securely
> configure Azure credentials (doc page
> 
> for reference).
>
>
>
> I am a junior developer tasked with being the first person to learn the
> Apache Flink framework. I know that storing secrets in flink-conf.yaml in a
> container is a bad idea. I’ve tried exposing Azure storage keys as env vars
> and using `config.setString`, but those properties seem to get overridden.
> I plan on using Flink operator, so if you can show me in that context
> that’d be ideal.
>
>
>
> Thanks, and sorry for bothering everyone. I’ve just exhausted myself and
> am hopeful someone will have mercy for me. I really will Venmo you $5 for
> coffee if you want.
>
>
>
> Thanks,
>
>
>
> Ivan
>
>
>
>
>
> Larger code examples:
>
>
>
> Setting dynamic properties before executing the job doesn’t work because
> the values seem to get overridden or never forwarded.
>
> ```
>
> val config = new Configuration()
>
> config.setString("fs.azure.account.key.mystore1.blob.core.windows.net",
> System.getenv("KEY_1"))
>
> config.setString("fs.azure.account.key.mystore2.blob.core.windows.net",
> System.getenv("KEY_2"))
>
> config.setString("fs.azure.account.key.mystore3.blob.core.windows.net",
> System.getenv("KEY_3"))
>
> val env = environment.StreamExecutionEnvironment
> .getExecutionEnvironment(config)
>
> ```
>
>
>
> In Flink operator configuration fields can be provided as follows, but
> then I can’t commit the file with a secret inside. Ideally there would be a
> way to reference a secret but the values must be literal strings.
>
> ```
>
> spec:
>
>   flinkConfiguration:
>
> fs.azure.account.key.mystore1.blob.core.windows.net: SECRET_STRING
>
> fs.azure.account.key.mystore2.blob.core.windows.net: SECRET_STRING
>
> fs.azure.account.key.mystore3.blob.core.windows.net: SECRET_STRING
>
> ```
>
>
>
> The last possible solution I can think that I’ll be trying is putting the
> entire flink-conf.yaml into a secret, or having a different container that
> adds secrets to the flink-operator-job.yaml and then does the `kubectl
> create -f flink-operator-job.yaml` (if that’s even possible).
>


Re: Fast and slow stream sources for Interval Join

2023-02-27 Thread Alexis Sarda-Espinosa
Hi Mason,

Very interesting, is it possible to apply both types of alignment? I.e.,
considering watermark skew across splits from within one source & also from
another source?

Regards,
Alexis.

On Tue, 28 Feb 2023, 05:26 Mason Chen,  wrote:

> Hi all,
>
> It's true that the problem can be handled by caching records in state.
> However, there is an alternative using `watermark alignment` with Flink
> 1.15+ [1] which does the desired synchronization that you described while
> reducing the size of state from the former approach.
>
> To use this with two topics of different speeds, you would need to define
> two Kafka sources, each corresponding to a topic. This limitation is
> documented in [1]. This limitation is resolved in Flink 1.17 by split level
> (partition level in the case of Kafka) watermark alignment, so one Kafka
> source reading various topics can align on the partitions of the different
> topics.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_
>
> Best,
> Mason
>
> On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I had this question myself and I've seen it a few times, the answer is
>> always the same, there's currently no official way to handle it without
>> state.
>>
>> Regards,
>> Alexis.
>>
>> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek,  wrote:
>>
>>> Hi,
>>>
>>> How to handle a case where one of the Kafka topics used for interval
>>> join is slower than the other? (Or a case where one topic lags behind)
>>> Is there a way to stop consuming from the fast topic and wait for the
>>> slow one to catch up? I want to avoid running out of memory (or keeping a
>>> very large state) and I don't want to discard any data from the fast topic
>>> until a watermark from the slow topic allows that.
>>>
>>> Best Regards
>>>
>>


Re: Fast and slow stream sources for Interval Join

2023-02-27 Thread Alexis Sarda-Espinosa
Hello,

I had this question myself and I've seen it a few times, the answer is
always the same, there's currently no official way to handle it without
state.

Regards,
Alexis.

On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek,  wrote:

> Hi,
>
> How to handle a case where one of the Kafka topics used for interval join
> is slower than the other? (Or a case where one topic lags behind)
> Is there a way to stop consuming from the fast topic and wait for the slow
> one to catch up? I want to avoid running out of memory (or keeping a very
> large state) and I don't want to discard any data from the fast topic until
> a watermark from the slow topic allows that.
>
> Best Regards
>


Re: Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Alexis Sarda-Espinosa
Ah I see, I'll have a look, thanks.

Am Do., 23. Feb. 2023 um 14:21 Uhr schrieb Gyula Fóra :

> If you are interested in helping to review this, here is the relevant
> ticket and the PR I just opened:
>
> https://issues.apache.org/jira/browse/FLINK-30786
> https://github.com/apache/flink-kubernetes-operator/pull/535
>
> Cheers,
> Gyula
>
> On Thu, Feb 23, 2023 at 2:10 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The current array merging strategy in the operator is basically an
>> overwrite by position yes.
>> I actually have a pending improvement to make this configurable and allow
>> merging arrays by "name" attribute. This is generally more practical for
>> such cases.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Feb 23, 2023 at 1:37 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I noticed that if I set environment variables in both spec.podTemplate &
>>> spec.jobManager.podTemplate for the same container (flink-maincontainer),
>>> the values from the latter selectively overwrite the values from the
>>> former. For example, if I define something like this (omitting metadata
>>> properties):
>>>
>>> spec:
>>>   podTemplate:
>>> spec:
>>>   containers:
>>>   - name: flink-main-container
>>> env:
>>>   - name: FOO
>>> value: BAR
>>>   - name: BAZ
>>> value: BAK
>>>   jobManager:
>>> podTemplate:
>>>   spec:
>>> containers:
>>> - name: flink-main-container
>>>   env:
>>> - name: EXTRA
>>>   value: ENVVAR
>>>
>>> The final spec for the Job Manager Deployment will only contain EXTRA
>>> and BAZ, so FOO is overwritten by EXTRA.
>>>
>>> Is this expected? I am already evaluating the latest release of the
>>> operator (1.4.0).
>>>
>>> Regards,
>>> Alexis.
>>>
>>


Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Alexis Sarda-Espinosa
Hello,

I noticed that if I set environment variables in both spec.podTemplate &
spec.jobManager.podTemplate for the same container (flink-maincontainer),
the values from the latter selectively overwrite the values from the
former. For example, if I define something like this (omitting metadata
properties):

spec:
  podTemplate:
spec:
  containers:
  - name: flink-main-container
env:
  - name: FOO
value: BAR
  - name: BAZ
value: BAK
  jobManager:
podTemplate:
  spec:
containers:
- name: flink-main-container
  env:
- name: EXTRA
  value: ENVVAR

The final spec for the Job Manager Deployment will only contain EXTRA and
BAZ, so FOO is overwritten by EXTRA.

Is this expected? I am already evaluating the latest release of the
operator (1.4.0).

Regards,
Alexis.


Re: Calculation of UI's maximum non-heap memory

2023-02-21 Thread Alexis Sarda-Espinosa
Very useful, thanks a lot.

Regards,
Alexis.

Am Di., 21. Feb. 2023 um 12:04 Uhr schrieb Weihua Hu :

> Hi Alexis,
>
> The maximum Non-Heap is the sum of the memory pool (which is non-hep) max
> size. There are 3 memory pools(based on jdk11):
> 1. Metaspace,  we can control the size with JVM parameter -XX:MaxMetaspaceSize
> or Flink configuration: jobmanager.memory.jvm-metaspace.size. For your
> job, this pool size is 150m
> 2. Compressed Class Space, this is controlled by JVM parameter
> -XX:CompressedClassSpaceSize. The default value is 1G or MetaspaceSize - 2
> * InitialBootClassLoaderMetaspaceSize(default is 4194304). For your job,
> this pool size is 142m
> 3. CodeCache(codeHeap profiled nmethods/non-profiled
> nmethods/non-nmethods), this is controlled by JVM parameter
> -XX:ReservedCodeCacheSize. The default value is 240m
>
> So, the maximum non-heap is 150+142+240 = 532m.
>
>
> Best,
> Weihua
>
>
> On Tue, Feb 21, 2023 at 2:33 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Weihua,
>>
>> Thanks for your response, I am familiar with those calculations, the one
>> I don't understand is the Maximum Non-Heap value.
>>
>> Regards,
>> Alexis.
>>
>> On Tue, 21 Feb 2023, 04:45 Weihua Hu,  wrote:
>>
>>> Hi, Alexis
>>>
>>> 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx
>>> to 673185792(642m),-XX:MaxDirectMemorySize to 
>>> 67108864(64m),-XX:MaxMetaspaceSize
>>> to 157286400(150m), you can find more information from [1]
>>> 2. As the hint in Flink UI: "The maximum heap displayed might differ
>>> from the configured values depending on the used GC algorithm for this
>>> process.", This[2] shows how JVM calculate the max heap memory from
>>> configured -Xms/-Xmx
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
>>> [2]
>>> https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have configured a job manager with the following settings (Flink
>>>> 1.16.1):
>>>>
>>>> jobmanager.memory.process.size: 1024m
>>>> jobmanager.memory.jvm-metaspace.size: 150m
>>>> jobmanager.memory.off-heap.size: 64m
>>>> jobmanager.memory.jvm-overhead.min: 168m
>>>> jobmanager.memory.jvm-overhead.max: 168m
>>>> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>>>>
>>>> However, when I look at the job manager dashboard in the UI, I see that
>>>> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
>>>> how this value is calculated?
>>>>
>>>> In case it's relevant, the effective configuration for JVM Heap is
>>>> reported as 642 MB, with the reported maximum being 621 MB.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>


Re: Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hi Weihua,

Thanks for your response, I am familiar with those calculations, the one I
don't understand is the Maximum Non-Heap value.

Regards,
Alexis.

On Tue, 21 Feb 2023, 04:45 Weihua Hu,  wrote:

> Hi, Alexis
>
> 1. With those configuration, Flink will set JVM parameters -Xms and -Xmx
> to 673185792(642m),-XX:MaxDirectMemorySize to 
> 67108864(64m),-XX:MaxMetaspaceSize
> to 157286400(150m), you can find more information from [1]
> 2. As the hint in Flink UI: "The maximum heap displayed might differ from
> the configured values depending on the used GC algorithm for this
> process.", This[2] shows how JVM calculate the max heap memory from
> configured -Xms/-Xmx
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_jobmanager/
> [2]
> https://stackoverflow.com/questions/52980629/runtime-getruntime-maxmemory-calculate-method
>
> Best,
> Weihua
>
>
> On Tue, Feb 21, 2023 at 12:15 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have configured a job manager with the following settings (Flink
>> 1.16.1):
>>
>> jobmanager.memory.process.size: 1024m
>> jobmanager.memory.jvm-metaspace.size: 150m
>> jobmanager.memory.off-heap.size: 64m
>> jobmanager.memory.jvm-overhead.min: 168m
>> jobmanager.memory.jvm-overhead.max: 168m
>> jobmanager.memory.enable-jvm-direct-memory-limit: "true"
>>
>> However, when I look at the job manager dashboard in the UI, I see that
>> the value of Non-Heap Maximum is reported as 532 MB. Could someone clarify
>> how this value is calculated?
>>
>> In case it's relevant, the effective configuration for JVM Heap is
>> reported as 642 MB, with the reported maximum being 621 MB.
>>
>> Regards,
>> Alexis.
>>
>>


Calculation of UI's maximum non-heap memory

2023-02-20 Thread Alexis Sarda-Espinosa
Hello,

I have configured a job manager with the following settings (Flink 1.16.1):

jobmanager.memory.process.size: 1024m
jobmanager.memory.jvm-metaspace.size: 150m
jobmanager.memory.off-heap.size: 64m
jobmanager.memory.jvm-overhead.min: 168m
jobmanager.memory.jvm-overhead.max: 168m
jobmanager.memory.enable-jvm-direct-memory-limit: "true"

However, when I look at the job manager dashboard in the UI, I see that the
value of Non-Heap Maximum is reported as 532 MB. Could someone clarify how
this value is calculated?

In case it's relevant, the effective configuration for JVM Heap is reported
as 642 MB, with the reported maximum being 621 MB.

Regards,
Alexis.


Re: Could savepoints contain in-flight data?

2023-02-13 Thread Alexis Sarda-Espinosa
Hi Hang,

Thanks for the confirmation. One follow-up question with a somewhat
convoluted scenario:

   1. An unaligned checkpoint is created.
   2. I stop the job *without* savepoint.
   3. I want to start a modified job from the checkpoint, but I changed one
   of the operator's uids.

If the operator whose uid changed had in-flight data as part of the
checkpoint, it will lose said data after starting, right?

I imagine this is not good practice, but it's just a hypothetical scenario
I wanted to understand better.

Regards,
Alexis.


Am Mo., 13. Feb. 2023 um 12:33 Uhr schrieb Hang Ruan :

> ps: the savepoint will also not contain in-flight data.
>
> Best,
> Hang
>
> Hang Ruan  于2023年2月13日周一 19:31写道:
>
>> Hi Alexis,
>>
>> No, aligned checkpoint will not contain the in-flight. Aligned checkpoint
>> makes sure that the data before the barrier has been processed and there is
>> no need to store in-flight data for one checkpoint.
>>
>> I think these documents[1][2] will help you to understand it.
>>
>>
>> Best,
>> Hang
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/concepts/stateful-stream-processing/#checkpointing
>>
>> Alexis Sarda-Espinosa  于2023年2月11日周六 06:00写道:
>>
>>> Hello,
>>>
>>> One feature of unaligned checkpoints is that the checkpoint barriers can
>>> overtake in-flight data, so the buffers are persisted as part of the state.
>>>
>>> The documentation for savepoints doesn't mention anything explicitly, so
>>> just to be sure, will savepoints always wait for in-flight data to be
>>> processed before they are completed, or could they also persist buffers in
>>> certain situations (e.g. when there's backpressure)?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Could savepoints contain in-flight data?

2023-02-10 Thread Alexis Sarda-Espinosa
Hello,

One feature of unaligned checkpoints is that the checkpoint barriers can
overtake in-flight data, so the buffers are persisted as part of the state.

The documentation for savepoints doesn't mention anything explicitly, so
just to be sure, will savepoints always wait for in-flight data to be
processed before they are completed, or could they also persist buffers in
certain situations (e.g. when there's backpressure)?

Regards,
Alexis.


Re: Backpressure due to busy sub-tasks

2022-12-16 Thread Alexis Sarda-Espinosa
Hi Martijn,

yes, that's what I meant, the throughput in the process function(s) didn't
change, so even if they were busy 100% of the time with parallelism=2, they
were processing data quickly enough.

Regards,
Alexis.

Am Fr., 16. Dez. 2022 um 14:20 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi,
>
> Backpressure implies that it's actually a later operator that is busy. So
> in this case, that would be your process function that can't handle the
> incoming load from your Kafka source.
>
> Best regards,
>
> Martijn
>
> On Tue, Dec 13, 2022 at 7:46 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a Kafka source (the new one) in Flink 1.15 that's followed by a
>> process function with parallelism=2. Some days, I see long periods of
>> backpressure in the source. During those times, the pool-usage metrics of
>> all tasks stay between 0 and 1%, but the process function appears 100% busy.
>>
>> To try to avoid backpressure, I increased parallelism to 3. It seems to
>> help, and busy-time decreased to around 80%, but something that caught my
>> attention is that throughput remained unchanged. Concretely, if X is the
>> number of events being written to the Kafka topic every second, each
>> instance of the process function receives roughly X/2 events/s with
>> parallelism=2, and X/3 with parallelism=3.
>>
>> I'm wondering a couple of things.
>>
>> 1. Is it possible that backpressure in this case is essentially a "false
>> positive" because the function is busy 100% of the time even though it's
>> processing enough data?
>> 2. Does Flink expose any way to tune this type of backpressure mechanism?
>>
>> Regards,
>> Alexis.
>>
>


Backpressure due to busy sub-tasks

2022-12-13 Thread Alexis Sarda-Espinosa
Hello,

I have a Kafka source (the new one) in Flink 1.15 that's followed by a
process function with parallelism=2. Some days, I see long periods of
backpressure in the source. During those times, the pool-usage metrics of
all tasks stay between 0 and 1%, but the process function appears 100% busy.

To try to avoid backpressure, I increased parallelism to 3. It seems to
help, and busy-time decreased to around 80%, but something that caught my
attention is that throughput remained unchanged. Concretely, if X is the
number of events being written to the Kafka topic every second, each
instance of the process function receives roughly X/2 events/s with
parallelism=2, and X/3 with parallelism=3.

I'm wondering a couple of things.

1. Is it possible that backpressure in this case is essentially a "false
positive" because the function is busy 100% of the time even though it's
processing enough data?
2. Does Flink expose any way to tune this type of backpressure mechanism?

Regards,
Alexis.


Re: Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-12 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

after some more digging, I think the job ID is maintained not because of
Flink HA, but because of the Kubernetes operator. It seems to me that
"savepoint" upgrade mode should ideally alter job ID when starting from the
savepoint, but I'm not sure.

Regards,
Alexis.

Am Mo., 12. Dez. 2022 um 10:31 Uhr schrieb Hangxiang Yu :

> Hi Alexis.
> IIUC, by default, the job id of the new job should be different if you
> restore from a stopped job ? Whether to cleanup is related to the savepoint
> restore mode.
> Just in the case of failover, the job id should not change, and everything
> in the checkpoint dir will be claimed as you said.
>
> > And a related question for a slightly different scenario, if I
> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
> stop-job-with-savepoint, does that trigger checkpoint deletion?
> In this case, the checkpoint will be cleaned and not retained and the
> savepoint will remain. So you still could use savepoint to restore.
>
> On Mon, Dec 5, 2022 at 6:33 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a doubt about a very particular scenario with this configuration:
>>
>> - Flink HA enabled (Kubernetes).
>> - ExternalizedCheckpointCleanup set to RETAIN_ON_CANCELLATION.
>> - Savepoint restore mode left as default NO_CLAIM.
>>
>> During an upgrade, a stop-job-with-savepoint is triggered, and then that
>> savepoint is used to start the upgraded job. Based on what I see, since HA
>> is enabled, the job ID doesn't change. Additionally, I understand the first
>> checkpoint after restoration will be a full one so that there's no
>> dependency on the used savepoint. However, since the job ID didn't change,
>> the new checkpoint still shares a path with "older" checkpoints, e.g.
>> /.../job_id/chk-1234.
>>
>> In this case, does this mean everything under /.../job_id/ *except*
>> shared/, taskowned/, and any chk-*/ folder whose id is smaller than 1234
>> could be deleted? I imagine even some files under shared/ could be deleted
>> as well, although that might be harder to identify.
>>
>> And a related question for a slightly different scenario, if I
>> use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
>> stop-job-with-savepoint, does that trigger checkpoint deletion?
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>


Re: Deterministic co-results

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Salva,

Just to give you further thoughts from another user, I think the "temporal
join" semantics are very critical in this use case, and what you implement
for that may not easily generalize to other cases. Because of that, I'm not
sure if you can really define best practices that apply in general.
Additionally, you also have to take idleness into account, given that using
event-time could leave you in a "frozen" state if you're not receiving
events continuously.

I also doubt you can accurately estimate out-of-orderness in this scenario
due to the particularities of Flink's network stack [1]. Even if you only
have 2 sources and immediately connect them together, parallelism and the
resulting shuffles can change from one execution to the other even if you
don't change the logic at all, because scheduling is also non-deterministic
and the "distribution" of events across different parallel instances of
your sources could vary a lot as well.

I think that others will tell you that you indeed need to find a way to
buffer events for a while, I got the same advice in the past. Focusing very
specifically on what you described (streams for data & control events +
event-time + temporal join), but maybe also assuming you can manage
watermarks in a way that handles idleness without simply freezing the
stream, I once tried a custom operator (not function) to force the
watermarks of 1 stream to wait for the other one -
see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
the idea of buffering, it just moves that responsibility to the operator
that's already handling "window data" for the join. Also, it extends an
internal class, so it's very much unofficial, and it's probably too
specific to my use case, but maybe it gives you other ideas to consider.

And one last detail to further exemplify complexity here: when I was
testing my custom operator with event-time simulations in my IDE, I
initially didn't think about the fact that a watermark with Long.MAX_VALUE
is sent at the end of the simulation, which was another source of
non-determinism because sometimes the control stream was processed entirely
(including the max watermark) before a single event from the data stream
went through, which meant that all events from the data stream were
considered late arrivals and silently dropped.

[1] https://flink.apache.org/2019/06/05/flink-network-stack.html
[2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
salcantara...@gmail.com>:

> Just for adding some extra references:
>
> [5]
> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
> [6]
> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
> [7]
> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>
> Salva
>
> On 2022/12/07 18:52:42 Salva Alcántara wrote:
> > It's well-known that Flink does not provide any guarantees on the order
> in
> > which a CoProcessFunction (or one of its multiple variations) processes
> its
> > two inputs [1]. I wonder then what is the current best
> practice/recommended
> > approach for cases where one needs deterministic results in presence of:
> >
> > 1. A control stream
> > 2. An event/data stream
> >
> > Let's consider event-time semantics; both streams have timestamps, and we
> > want to implement "temporal join" semantics where the input events are
> > controlled based on the latest control signals received before them,
> i.e.,
> > the ones "active" when the events happened. For simplicity, let's assume
> > that all events are received in order, so that the only source of
> > non-determinism is the one introduced by the CoProcessFunction itself.
> >
> > I'm considering the following options:
> >
> > 1. Buffer events inside the CoProcessFunction for some time, while saving
> > all the control signals in state (indexed by time)
> > 2. Same as before but doing the pre-buffering of the event/data streams
> > before the CoProcessFunction
> > 3. Similar as before but considering both streams together by
> multiplexing
> > them into one heterogeneous stream which would be pre-sorted in order to
> > guarantee the global ordering of the events from the two different
> sources.
> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> > the data in order and hence produce deterministic results
> >
> > Essentially, all the strategies amount to introducing a "minimum amount
> of
> > delay" to guarantee the deterministic processing, which brings me to the
> > following question:
> >
> > * How to get an estimate for the out-of-order-ness bound that a
> > CoProcessFunction can introduce? Is that even possible in the first
> place?
> > I guess this mostly

Re: Cleanup for high-availability.storageDir

2022-12-08 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think you didn't include the mailing list in your response.

According to my experiments, using last-state means the operator simply
deletes the Flink pods, and I believe that doesn't count as Cancelled, so
the artifacts for blobs and submitted job graphs are not cleaned up. I
imagine the same logic Gyula mentioned before applies, namely keep the
latest one and clean the older ones.

Regards,
Alexis.

Am Do., 8. Dez. 2022 um 10:37 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> I see, I confused the Flink-internal recovery with what the Flink
> Kubernetes Operator does for redeploying the Flink job. AFAIU, when you do
> an upgrade of your job, the operator will cancel the Flink job (I'm
> assuming now that you use Flink's Application mode rather than Session
> mode). The operator cancelled your job and shuts down the cluster.
> Checkpoints are retained and, therefore, can be used as the so-called "last
> state" when redeploying your job using the same Job ID. In that case, the
> corresponding jobGraph and other BLOBs should be cleaned up by Flink
> itself. The checkpoint files are retained, i.e. survive the Flink cluster
> shutdown.
>
> When redeploying the Flink cluster with the (updated) job, a new JobGraph
> file is created by Flink internally. BLOBs are recreated as well. New
> checkpoints are going to be created and old ones (that are not needed
> anymore) are cleaned up.
>
> Just to recap what I said before (to make it more explicit to
> differentiate what the k8s operator does and what Flink does internally):
> Removing the artifacts you were talking about in your previous post would
> harm Flink's internal recovery mechanism. That's probably not what you want.
>
> @Gyula: Please correct me if I misunderstood something here.
>
> I hope that helped.
> Matthias
>
> On Wed, Dec 7, 2022 at 4:19 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> I see, thanks for the details.
>>
>> I do mean replacing the job without stopping it terminally. Specifically,
>> I mean updating the container image with one that contains an updated job
>> jar. Naturally, the new version must not break state compatibility, but as
>> long as that is fulfilled, the job should be able to use the last
>> checkpoint as starting point. It's my understanding that this is how the
>> Kubernetes operator's "last-state" upgrade mode works [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> > - job_name/submittedJobGraphX
>>> submittedJobGraph* is the persisted JobGraph that would be picked up in
>>> case of a failover. Deleting this file would result in Flink's failure
>>> recovery not working properly anymore if the job is still executed but
>>> needs to be restarted because the actual job definition is gone.
>>>
>>> > completedCheckpointXYZ
>>> This is the persisted CompletedCheckpoint with a reference to the actual
>>> Checkpoint directory. Deleting this file would be problematic if the state
>>> recovery relies in some way on this specific checkpoint. The HA data relies
>>> on this file to be present. Failover only fails if there's no newer
>>> checkpoint or the HA data still refers to this checkpoint in some way.
>>>
>>> > - job_name/blob/job_uuid/blob_...
>>> Artifacts of the BlobServer containing runtime artifacts of the jobs
>>> (e.g. logs, libraries, ...)
>>>
>>> In general, you don't want to clean HA artifacts if the job hasn't
>>> reached a terminal state, yet, as it harms Flink's ability to recover the
>>> job. Additionally, these files are connected with the HA backend, i.e. the
>>> file path is stored in the HA backend. Removing the artifacts will likely
>>> result in metadata becoming invalid.
>>>
>>> What do you mean with "testing updates *without* savepoints"? Are you
>>> referring to replacing the job's business logic without stopping the job?
>>>
>>> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Then the explanation is likely that the job has not reached a terminal
>>>> state. I was testing updates *without* savepoints (but with HA)

Re: Cleanup for high-availability.storageDir

2022-12-07 Thread Alexis Sarda-Espinosa
I see, thanks for the details.

I do mean replacing the job without stopping it terminally. Specifically, I
mean updating the container image with one that contains an updated job
jar. Naturally, the new version must not break state compatibility, but as
long as that is fulfilled, the job should be able to use the last
checkpoint as starting point. It's my understanding that this is how the
Kubernetes operator's "last-state" upgrade mode works [1].

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades

Regards,
Alexis.

Am Mi., 7. Dez. 2022 um 15:54 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> > - job_name/submittedJobGraphX
> submittedJobGraph* is the persisted JobGraph that would be picked up in
> case of a failover. Deleting this file would result in Flink's failure
> recovery not working properly anymore if the job is still executed but
> needs to be restarted because the actual job definition is gone.
>
> > completedCheckpointXYZ
> This is the persisted CompletedCheckpoint with a reference to the actual
> Checkpoint directory. Deleting this file would be problematic if the state
> recovery relies in some way on this specific checkpoint. The HA data relies
> on this file to be present. Failover only fails if there's no newer
> checkpoint or the HA data still refers to this checkpoint in some way.
>
> > - job_name/blob/job_uuid/blob_...
> Artifacts of the BlobServer containing runtime artifacts of the jobs (e.g.
> logs, libraries, ...)
>
> In general, you don't want to clean HA artifacts if the job hasn't reached
> a terminal state, yet, as it harms Flink's ability to recover the job.
> Additionally, these files are connected with the HA backend, i.e. the file
> path is stored in the HA backend. Removing the artifacts will likely result
> in metadata becoming invalid.
>
> What do you mean with "testing updates *without* savepoints"? Are you
> referring to replacing the job's business logic without stopping the job?
>
> On Wed, Dec 7, 2022 at 3:17 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Matthias,
>>
>> Then the explanation is likely that the job has not reached a terminal
>> state. I was testing updates *without* savepoints (but with HA), so I guess
>> that never triggers automatic cleanup.
>>
>> Since, in my case, the job will theoretically never reach a terminal
>> state with this configuration, would it cause issues if I clean the
>> artifacts manually?
>>
>> *And for completeness, I also see an artifact called
>> completedCheckpointXYZ which is updated over time, and I imagine that
>> should never be removed.
>>
>> Regards,
>> Alexis.
>>
>> Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
>> matthias.p...@aiven.io>:
>>
>>> Flink should already take care of cleaning the artifacts you mentioned.
>>> Flink 1.15+ even includes retries if something went wrong. There are still
>>> a few bugs that need to be fixed (e.g. FLINK-27355 [1]). Checkpoint HA data
>>> is not properly cleaned up, yet, which is covered by FLIP-270 [2].
>>>
>>> It would be interesting to know why these artifacts haven't been deleted
>>> assuming that the corresponding job is actually in a final state (e.g.
>>> FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry file for
>>> that specific job available in the folder Gyula was referring to in the
>>> linked documentation. At least for the JobGraph files, it's likely that you
>>> have additional metadata still stored in your HA backend (that refers to
>>> the files). That would be something you might want to clean up as well, if
>>> you want to do a proper cleanup. But still, it would be good to understand
>>> why these files are not cleaned up by Flink.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-27355
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>>>
>>> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> One concrete question, under the HA folder I also see these sample
>>>> entries:
>>>>
>>>> - job_name/blob/job_uuid/blob_...
>>>> - job_name/submittedJobGraphX
>>>> - job_name/submittedJobGraphY
>>>>
>>>> Is it safe to clean these up when the jo

Re: Cleanup for high-availability.storageDir

2022-12-07 Thread Alexis Sarda-Espinosa
Hi Matthias,

Then the explanation is likely that the job has not reached a terminal
state. I was testing updates *without* savepoints (but with HA), so I guess
that never triggers automatic cleanup.

Since, in my case, the job will theoretically never reach a terminal state
with this configuration, would it cause issues if I clean the artifacts
manually?

*And for completeness, I also see an artifact called completedCheckpointXYZ
which is updated over time, and I imagine that should never be removed.

Regards,
Alexis.

Am Mi., 7. Dez. 2022 um 13:03 Uhr schrieb Matthias Pohl <
matthias.p...@aiven.io>:

> Flink should already take care of cleaning the artifacts you mentioned.
> Flink 1.15+ even includes retries if something went wrong. There are still
> a few bugs that need to be fixed (e.g. FLINK-27355 [1]). Checkpoint HA data
> is not properly cleaned up, yet, which is covered by FLIP-270 [2].
>
> It would be interesting to know why these artifacts haven't been deleted
> assuming that the corresponding job is actually in a final state (e.g.
> FAILED, CANCELLED, FINISHED), i.e. there is a JobResultStoreEntry file for
> that specific job available in the folder Gyula was referring to in the
> linked documentation. At least for the JobGraph files, it's likely that you
> have additional metadata still stored in your HA backend (that refers to
> the files). That would be something you might want to clean up as well, if
> you want to do a proper cleanup. But still, it would be good to understand
> why these files are not cleaned up by Flink.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-27355
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>
> On Tue, Dec 6, 2022 at 5:42 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> One concrete question, under the HA folder I also see these sample
>> entries:
>>
>> - job_name/blob/job_uuid/blob_...
>> - job_name/submittedJobGraphX
>> - job_name/submittedJobGraphY
>>
>> Is it safe to clean these up when the job is in a healthy state?
>>
>> Regards,
>> Alexis.
>>
>> Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com>:
>>
>>> Hi Gyula,
>>>
>>> that certainly helps, but to set up automatic cleanup (in my case, of
>>> azure blob storage), the ideal option would be to be able to set a simple
>>> policy that deletes blobs that haven't been updated in some time, but that
>>> would assume that anything that's actually relevant for the latest state is
>>> "touched" by the JM on every checkpoint, and since I also see blobs
>>> referencing "submitted job graphs", I imagine that might not be a safe
>>> assumption.
>>>
>>> I understand the life cycle of those blobs isn't directly managed by the
>>> operator, but in that regard it could make things more cumbersome.
>>>
>>> Ideally, Flink itself would guarantee this sort of allowable TTL for HA
>>> files, but I'm sure that's not trivial.
>>>
>>> Regards,
>>> Alexis.
>>>
>>> On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:
>>>
>>>> Hi!
>>>>
>>>> There are some files that are not cleaned up over time in the HA dir
>>>> that need to be cleaned up by the user:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>>>
>>>>
>>>> Hope this helps
>>>> Gyula
>>>>
>>>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I see the number of entries in the directory configured for HA
>>>>> increases over time, particularly in the context of job upgrades in a
>>>>> Kubernetes environment managed by the operator. Would it be safe to assume
>>>>> that any files that haven't been updated in a while can be deleted?
>>>>> Assuming the checkpointing interval is much smaller than the period used 
>>>>> to
>>>>> determine if files are too old.
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>


Re: Cleanup for high-availability.storageDir

2022-12-06 Thread Alexis Sarda-Espinosa
One concrete question, under the HA folder I also see these sample entries:

- job_name/blob/job_uuid/blob_...
- job_name/submittedJobGraphX
- job_name/submittedJobGraphY

Is it safe to clean these up when the job is in a healthy state?

Regards,
Alexis.

Am Mo., 5. Dez. 2022 um 20:09 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Gyula,
>
> that certainly helps, but to set up automatic cleanup (in my case, of
> azure blob storage), the ideal option would be to be able to set a simple
> policy that deletes blobs that haven't been updated in some time, but that
> would assume that anything that's actually relevant for the latest state is
> "touched" by the JM on every checkpoint, and since I also see blobs
> referencing "submitted job graphs", I imagine that might not be a safe
> assumption.
>
> I understand the life cycle of those blobs isn't directly managed by the
> operator, but in that regard it could make things more cumbersome.
>
> Ideally, Flink itself would guarantee this sort of allowable TTL for HA
> files, but I'm sure that's not trivial.
>
> Regards,
> Alexis.
>
> On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:
>
>> Hi!
>>
>> There are some files that are not cleaned up over time in the HA dir that
>> need to be cleaned up by the user:
>>
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>>
>>
>> Hope this helps
>> Gyula
>>
>> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I see the number of entries in the directory configured for HA increases
>>> over time, particularly in the context of job upgrades in a Kubernetes
>>> environment managed by the operator. Would it be safe to assume that any
>>> files that haven't been updated in a while can be deleted? Assuming the
>>> checkpointing interval is much smaller than the period used to determine if
>>> files are too old.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>


Re: Cleanup for high-availability.storageDir

2022-12-05 Thread Alexis Sarda-Espinosa
Hi Gyula,

that certainly helps, but to set up automatic cleanup (in my case, of azure
blob storage), the ideal option would be to be able to set a simple policy
that deletes blobs that haven't been updated in some time, but that would
assume that anything that's actually relevant for the latest state is
"touched" by the JM on every checkpoint, and since I also see blobs
referencing "submitted job graphs", I imagine that might not be a safe
assumption.

I understand the life cycle of those blobs isn't directly managed by the
operator, but in that regard it could make things more cumbersome.

Ideally, Flink itself would guarantee this sort of allowable TTL for HA
files, but I'm sure that's not trivial.

Regards,
Alexis.

On Mon, 5 Dec 2022, 19:19 Gyula Fóra,  wrote:

> Hi!
>
> There are some files that are not cleaned up over time in the HA dir that
> need to be cleaned up by the user:
>
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak
>
>
> Hope this helps
> Gyula
>
> On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I see the number of entries in the directory configured for HA increases
>> over time, particularly in the context of job upgrades in a Kubernetes
>> environment managed by the operator. Would it be safe to assume that any
>> files that haven't been updated in a while can be deleted? Assuming the
>> checkpointing interval is much smaller than the period used to determine if
>> files are too old.
>>
>> Regards,
>> Alexis.
>>
>>


Cleanup for high-availability.storageDir

2022-12-05 Thread Alexis Sarda-Espinosa
Hello,

I see the number of entries in the directory configured for HA increases
over time, particularly in the context of job upgrades in a Kubernetes
environment managed by the operator. Would it be safe to assume that any
files that haven't been updated in a while can be deleted? Assuming the
checkpointing interval is much smaller than the period used to determine if
files are too old.

Regards,
Alexis.


Clarification on checkpoint cleanup with RETAIN_ON_CANCELLATION

2022-12-05 Thread Alexis Sarda-Espinosa
Hello,

I have a doubt about a very particular scenario with this configuration:

- Flink HA enabled (Kubernetes).
- ExternalizedCheckpointCleanup set to RETAIN_ON_CANCELLATION.
- Savepoint restore mode left as default NO_CLAIM.

During an upgrade, a stop-job-with-savepoint is triggered, and then that
savepoint is used to start the upgraded job. Based on what I see, since HA
is enabled, the job ID doesn't change. Additionally, I understand the first
checkpoint after restoration will be a full one so that there's no
dependency on the used savepoint. However, since the job ID didn't change,
the new checkpoint still shares a path with "older" checkpoints, e.g.
/.../job_id/chk-1234.

In this case, does this mean everything under /.../job_id/ *except*
shared/, taskowned/, and any chk-*/ folder whose id is smaller than 1234
could be deleted? I imagine even some files under shared/ could be deleted
as well, although that might be harder to identify.

And a related question for a slightly different scenario, if I
use ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION and trigger a
stop-job-with-savepoint, does that trigger checkpoint deletion?

Regards,
Alexis.


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Just to be clear, I don't think the operator must have special logic to
find out if a savepoint was used as base for an incremental checkpoint,
however, the operator logic might want to completely disable savepoint
cleanup for a deployment if the user enabled CLAIM mode for it. At least
that sounds like the safer option to me.

Regards,
Alexis.

On Tue, 29 Nov 2022, 10:31 Gyula Fóra,  wrote:

> The operator might call dispose on an old savepoint that’s true, but I am
> not sure if the dispose api call would actually corrupt it.
>
> Gyula
>
> On Tue, 29 Nov 2022 at 09:28, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi Hangxiang,
>>
>> but, if I understand correctly, setting restore mode to CLAIM means that
>> the job might create a new incremental checkpoint based on the savepoint,
>> right? And if the operator then decides to clean up the savepoint, the
>> checkpoint would be corrupted, no?
>>
>> Regards,
>> Alexis.
>>
>> Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu <
>> master...@gmail.com>:
>>
>>> Hi, Alexis.
>>> IIUC, There is no conflict between savepoint history and restore mode.
>>> Restore mode cares about whether/how we manage the savepoint of old job.
>>> Savepoint management in operator only cares about savepoint history of
>>> new job.
>>> In other words, savepoint cleanup should not clean the savepoint from
>>> the old job which should only be controlled by restore mode.
>>> So I think you could also set restore mode according to your needs.
>>>
>>>
>>> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Is there a recommended configuration for the restore mode of jobs
>>>> managed by the operator?
>>>>
>>>> Since the documentation states that the operator keeps a savepoint
>>>> history to perform cleanup, I imagine restore mode should always be
>>>> NO_CLAIM, but I just want to confirm.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Alexis Sarda-Espinosa
Hi Hangxiang,

but, if I understand correctly, setting restore mode to CLAIM means that
the job might create a new incremental checkpoint based on the savepoint,
right? And if the operator then decides to clean up the savepoint, the
checkpoint would be corrupted, no?

Regards,
Alexis.

Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu :

> Hi, Alexis.
> IIUC, There is no conflict between savepoint history and restore mode.
> Restore mode cares about whether/how we manage the savepoint of old job.
> Savepoint management in operator only cares about savepoint history of new
> job.
> In other words, savepoint cleanup should not clean the savepoint from the
> old job which should only be controlled by restore mode.
> So I think you could also set restore mode according to your needs.
>
>
> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a recommended configuration for the restore mode of jobs managed
>> by the operator?
>>
>> Since the documentation states that the operator keeps a savepoint
>> history to perform cleanup, I imagine restore mode should always be
>> NO_CLAIM, but I just want to confirm.
>>
>> Regards,
>> Alexis.
>>
>
>
> --
> Best,
> Hangxiang.
>


Kubernetes operator and jobs with last-state upgrades

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

I am doing some tests with the operator and, if I'm not mistaken, using
last-state upgrade means that, when something is changed in the CR, no
savepoint is taken and the pods are simply terminated. Is that a
requirement from Flink HA? I would have thought last-state would still use
savepoints for upgrade if the current status is stable.

Regards,
Alexis.


Re: Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Ah I see, cool, thanks.

Regards,
Alexis.

Am Mi., 16. Nov. 2022 um 15:50 Uhr schrieb Gyula Fóra :

> This has been changed in the current snapshot release:
> https://issues.apache.org/jira/browse/FLINK-28979
>
> It will be part of the 1.3.0 version.
>
> On Wed, Nov 16, 2022 at 3:32 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there a particular reason the operator doesn't set owner references
>> for the Deployments it creates as a result of a FlinkDeployment CR? This
>> makes tracking in the Argo CD UI impossible. (To be clear, I mean a
>> reference from the Deployment to the FlinkDeployment).
>>
>> Regards,
>> Alexis.
>>
>>


Savepoint restore mode for the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a recommended configuration for the restore mode of jobs managed
by the operator?

Since the documentation states that the operator keeps a savepoint history
to perform cleanup, I imagine restore mode should always be NO_CLAIM, but I
just want to confirm.

Regards,
Alexis.


Owner reference with the Kubernetes operator

2022-11-16 Thread Alexis Sarda-Espinosa
Hello,

Is there a particular reason the operator doesn't set owner references for
the Deployments it creates as a result of a FlinkDeployment CR? This makes
tracking in the Argo CD UI impossible. (To be clear, I mean a reference
from the Deployment to the FlinkDeployment).

Regards,
Alexis.


Broadcast state and job restarts

2022-10-27 Thread Alexis Sarda-Espinosa
Hello,

The documentation for broadcast state specifies that it is always kept in
memory. My assumptions based on this statement are:

1. If a job restarts in the same Flink cluster (i.e. using a restart
strategy), the tasks' attempt number increases and the broadcast state is
restored since it's not lost from memory.
2. If the whole Flink cluster is restarted with a savepoint, broadcast
state will not be restored and I need to write my application with this in
mind.

Are these correct?

Regards,
Alexis.


Broadcast state restoration for BroadcastProcessFunction

2022-10-14 Thread Alexis Sarda-Espinosa
Hello,

I wrote a test for a broadcast function to check how it handles broadcast
state during retries [1] (the gist only shows a subset of the test in
Kotlin, but it's hopefully understandable). The test will not pass unless
my function also implements CheckpointedFunction, although those
interface's methods' implementations can be empty - the state is empty in
this case, even though its descriptor is registered with the harness.

Is this requirement specific to the test harness API?
Otherwise BaseBroadcastProcessFunction should implement
CheckpointedFunction, maybe with empty default methods, no?

[1] https://gist.github.com/asardaes/b804b7ed04ace176881189c3d1cf842a

Regards,
Alexis.


Re: Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Oh wow, I had read that documentation so many times and I was sure that API
also expected the broadcasted side to have a key like the other side, but
that's not the case, that is already what I was thinking of. Thanks.

Regards,
Alexis.

On Wed, 12 Oct 2022, 03:42 仙路尽头谁为峰,  wrote:

> Hi Alexis:
>
>The broadcast state pattern should be done by calling connect() on
> the non-broadcasted stream, with the *broadcaststream* as an argument.
>
>And if the main stream is keyedStream, then the processElement
> function will have access to any keyed state as normal keyedstream.
>
>
>
> Best Regards!
>
> 从 Windows 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>
>
> *发件人: *Alexis Sarda-Espinosa 
> *发送时间: *2022年10月12日 4:11
> *收件人: *user 
> *主题: *Partial broadcast/keyed connected streams
>
>
>
> Hi everyone,
>
>
>
> I am currently thinking about a use case for a streaming job and, while
> I'm fairly certain it cannot be done with the APIs that Flink currently
> provides, I figured I'd put it out there in case other users think
> something like this would be useful to a wider audience.
>
>
>
> The current broadcasting mechanisms offered by Flink mention use cases
> where "control events" are needed. In my case I would also have control
> events, and I would need to broadcast them to *all parallel instances* of
> any downstream operators that consume the events. However, some of those
> operators have to be keyed because they are stateful. From the API's point
> of view, I'd imagine something like
>
>
>
>
> controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)
>
>
>
> The function would also have something like processElement1 and
> processElement2, but one of those methods wouldn't have access to
> partitioned state (or could it have access to state for all key groups
> handled by that instance?).
>
>
>
> Since I'm not familiar with all of Flink's internals, I don't know if this
> would be even remotely feasible, but I'd like to know if others have
> opinions on this.
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>


Partial broadcast/keyed connected streams

2022-10-11 Thread Alexis Sarda-Espinosa
Hi everyone,

I am currently thinking about a use case for a streaming job and, while I'm
fairly certain it cannot be done with the APIs that Flink currently
provides, I figured I'd put it out there in case other users think
something like this would be useful to a wider audience.

The current broadcasting mechanisms offered by Flink mention use cases
where "control events" are needed. In my case I would also have control
events, and I would need to broadcast them to *all parallel instances* of
any downstream operators that consume the events. However, some of those
operators have to be keyed because they are stateful. From the API's point
of view, I'd imagine something like

controlStream.connect(mainStream).broadcastFirstKeySecondBy(keySelector).process(PartiallyKeyedCoProcessFunction)

The function would also have something like processElement1 and
processElement2, but one of those methods wouldn't have access to
partitioned state (or could it have access to state for all key groups
handled by that instance?).

Since I'm not familiar with all of Flink's internals, I don't know if this
would be even remotely feasible, but I'd like to know if others have
opinions on this.

Regards,
Alexis.


Re: Window state size with global window and custom trigger

2022-10-10 Thread Alexis Sarda-Espinosa
Thanks for the confirmation :)

Regards,
Alexis.

On Sun, 9 Oct 2022, 10:37 Hangxiang Yu,  wrote:

> Hi, Alexis.
> I think you are right. It also applies for a global window with a custom
> trigger.
> If you apply a ReduceFunction or AggregateFunction, the window state size
> usually is smaller than applying ProcessWindowFunction due to the
> aggregated value. It also works for global windows.
> Of course, the state size of a global window also depends on how you
> implement your trigger.
> BTW, we often use TTL to reduce the state size of the global window.
> Hope these can help you.
>
>
> On Sat, Oct 8, 2022 at 4:49 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I found an SO thread that clarifies some details of window state size
>> [1]. I would just like to confirm that this also applies when using a
>> global window with a custom trigger.
>>
>> The reason I ask is that the TriggerResult API is meant to cover all
>> supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
>> for a ProcessWindowFunction that holds all input records until it fires.
>> However, I assume there would be no distinction if I use a
>> (Rich)AggregateFunction, regardless of window type (global vs timed), but
>> I'd like to be sure.
>>
>> Regards,
>> Alexis.
>>
>> [1]
>> https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management
>>
>>
>
> --
> Best,
> Hangxiang.
>


Window state size with global window and custom trigger

2022-10-07 Thread Alexis Sarda-Espinosa
Hello,

I found an SO thread that clarifies some details of window state size [1].
I would just like to confirm that this also applies when using a global
window with a custom trigger.

The reason I ask is that the TriggerResult API is meant to cover all
supported scenarios, so FIRE vs FIRE_AND_PURGE is relevant, for example,
for a ProcessWindowFunction that holds all input records until it fires.
However, I assume there would be no distinction if I use a
(Rich)AggregateFunction, regardless of window type (global vs timed), but
I'd like to be sure.

Regards,
Alexis.

[1]
https://stackoverflow.com/questions/55247668/flink-window-state-size-and-state-management


  1   2   >