Re: Non-temporal watermarks

2023-02-02 Thread James Sandys-Lumsdaine
I can describe a use that has been successful for me. We have a Flink workflow 
that calculates reports over many days and have it currently set up to 
recompute the last 10 days or so when recovering this "deep history" from our 
databases and then switches over to live flow to process all subsequent update 
events. I wrote this before the days of the HyrbidSource so it is literally a 
JDBC data source that queries state for the last 10 days and that stream is 
merged with a "live" stream from a db poller or Kafka stream.

In answer to your question, during recovery I have all state for the old 
business days sent with a timestamp of that business date e.g. new 
DateTime(2023, 1, 15, 0, 0, 0, UTC).getMillis() for any data associated with 
the 15th Jan 2023. Once the data source has emitted all the state for that 
date, it then emits a watermark with exactly the same timestamp as it is 
communicating downstream that all the data has been sent for that date. Then 
moves onto the next date emitting that state.

When my system starts up it records the current datetime and treats all data 
retrieved before that timestamp as being recovered state, and all data 
receieved from the live pollers/Kafka to be after that cut-off point. The live 
sources emit objects timestamped with the current time and periodically emit a 
watermark to make forward progress. I'm simplifying here but you get the point.

This pattern is useful for me because my keyed process functions are able to 
register timers to process all the data for an historic date at once - it won't 
need to fire on each message received or try to compute with missing data, but 
instead runs once all the data has been received for a date from all the 
sources. (The time is only triggered when the watermark is reached and that 
required all sources to have reached at least that point in the recovery). Once 
we have reached the startup datetime watermark the system seamlessly flips into 
live processing mode. The watermarks still trigger my timers but now we are 
processing the last ~1 minute of batched data.

So logically the meaning of a timestamp and watermark in my system always 
represents a forward moving moment in time - it is just that it means an 
historic date for data during recovery from the databases and then a current 
timestamp when the system is processing live data.

Hope that gives you some ideas and help.

James.

From: Gen Luo 
Sent: 02 February 2023 09:52
To: Jan Lukavský 
Cc: user@flink.apache.org 
Subject: Re: Non-temporal watermarks

Hi,

This is an interesting topic. I suppose the watermark is defined based on the 
event time since it's mainly used, or designed, for the event time processing. 
Flink provides the event time processing mechanism because it's widely needed. 
Every event has its event time and we usually need to group or order by the 
event time. On the other hand, this also means that we can process events from 
different sources as the event time is naturally of the same scale.

However, just as you say, technically speaking the event timestamp can be 
replaced with any other meaningful number (or event a comparable), and the 
(event time) watermark should change accordingly. If we promise this field and 
its watermark of all sources are of the same scale, we can process the 
data/event from the sources together with it just like the event time. As the 
event time processing and event time timer service doesn't rely on the actual 
time point or duration, I suppose this can be implemented by defining it as the 
event time, if it contains only positive numbers.


On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:
Hi,

I will not speak about details related to Flink specifically, the
concept of watermarks is more abstract, so I'll leave implementation
details aside.

Speaking generally, yes, there is a set of requirements that must be met
in order to be able to generate a system that uses watermarks.

The primary question is what are watermarks used for? The answer is - we
need watermarks to be able to define a partially stable order of
_events_. Event is an immutable piece of data that can be _observed_
(i.e. processed) with various consumer-dependent delays (two consumers
of the event can see the event at different processing times), or a
specific (local) timestamp. Generally an event tells us that something,
somewhere happened at given local timestamp.

Watermarks create markers in processing time of each observer, so that
the observer is able to tell if two events (e.g. event "close
time-window T1" and "new data with timestamp T2 arrived") can be ordered
(that is being able to tell which one is - globally! - preceding the other).

Having said that - there is a general algebra for "timestamps" - and
therefore watermarks. A timestamp can be any object that defines the
following operations:

  - a less-than relation <, i.e. t1 < t2: bool, this relation needs to
be 

Setting a timer within broadcast applyToKeyedState() (feature request)

2022-07-07 Thread James Sandys-Lumsdaine
Hello,

I know we can’t set a timer in the processBroadcastElement() of the 
KeyedBroadcastProcessFunction as there is no key. 

However, there is a context.applyToKeyedState() method which allows us to 
iterate over the keyed state in the scope of a key. So it is possible to add 
access to the TimerService onto the Context parameter passed into that 
delegate? 

Since the code running in the applyToKeyedState() method is scoped to a key we 
should be able to set up timers for that key too. 

Thanks,

James. 

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-18 Thread James Sandys-Lumsdaine
Hello Matthias,

Thanks for your reply. Yes indeed your are correct. My /tmp path is private so 
you have confirmed what I thought was happening.

I have some follow up questions:
- why do taskmanagers create the chk-x directory but only the jobmanager can 
delete it? Shouldn’t the jobmanager be the only component creating and deleting 
these directories? That would seem more consistent to me but maybe there is a 
reason.
- I see many files under each chk-x folder. Can anyone confirm if each file is 
wholly owned by a single task manager? ie is each file only written by 1 TM? 
Otherwise there could be file locking and contention.
- we are now looking to add in NFS mounts for our containers so all the job 
managers and taskmanagers share the same path. Can anyone confirm if NFS is a 
‘reliable’ storage mechanism as we have heard many stories how problematic it 
can be. We are not yet able to use HDFS or S3.
- if Flink can not write to NFS my understanding is although the checkpoint 
will fail the Flink process will carry on and try again at the next checkpoint. 
It will not cause my program to fail correct?

Many thanks again,

James.

Sent from my iPhone

On 17 May 2022, at 15:17, Schwalbe Matthias  wrote:


Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.
____
From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job 

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.

So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu 
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine ; user@flink.apache.org 

Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.
____________
From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Checkpoint directories not cleared as TaskManagers run

2022-05-16 Thread James Sandys-Lumsdaine
Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Slowness using GenericWriteAheadSink

2022-03-23 Thread James Sandys-Lumsdaine
Is anyone able to comment on the below? My worry is this class isn’t well 
support so I may need to find an alternative to bulk copy data into SQL Server 
e.g. use a simple file sink and then have some process bulk copy the files.

From: Sandys-Lumsdaine, James 
Sent: 11 March 2022 13:59
To: user@flink.apache.org 
Subject: Slowness using GenericWriteAheadSink


Hello,



We are using the GenericWriteAheadSink to buffer up values to then send to a 
SQL Server database with a fast bulk copy upload. However, when I watch my 
process running it seems to be a huge amount of time iterating the Iterable 
provided to the sendValues() method. It takes such a long time I’ve had to 
increase the checkpoint timeout because it causes the whole workflow to suspend.



I am using Flink 1.14.0 and have attached a simple, self-contained example. If 
I was to guess then there is a very large deserialization overhead from the 
checkpointed data even though I’m currently using a HashMapStateBackend. I have 
profiled the application and it definitely seems to spend most of its time 
there. The object involved is just a plain POJO.



A second “issue” is that I am forced to clone the objects provided by the 
iterator – when I dug into the code I could see a 
ReusingMutableToRegularIteratorWrapper class being using and the objects passed 
were being reused between 2 objects. I don’t know the reasoning behind this 
(except to prevent extra garbage?) but it would be nice if I could specify a 
“non-reusing” one otherwise there is a deserialization AND a clone for every 
object in the list.



Any pointers or advice on a better way to send large amounts of data to a SQL 
Server sink would be appreciated.



James.

The information transmitted is intended only for the person or entity to which 
it is addressed and may contain confidential and/or privileged material. Any 
review, retransmission, dissemination or other use of, or taking of any action 
in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited. If you received this in error, please contact 
the sender and delete the material from any computer.

This communication is for informational purposes only. It is not intended as an 
offer or solicitation for the purchase or sale of any financial instrument or 
as an official confirmation of any transaction. Any market prices, data and 
other information are not warranted as to completeness or accuracy and are 
subject to change without notice. Any comments or statements made herein do not 
necessarily reflect those of Systematica Investments UK LLP, its parents, 
subsidiaries or affiliates.

Systematica Investments UK LLP (“SIUK”), which is authorised and regulated by 
the Financial Conduct Authority of the United Kingdom (the “FCA”) is authorised 
and regulated by the Financial Conduct Authority and is registered with the 
U.S. Securities and Exchange Commission as an investment adviser under the 
Investment Advisers Act of 1940.

Systematica Investments UK LLP is registered in England and Wales with a 
partnership number OC424197. Registered Office: Equitable House, 47 King 
William Street, London EC4R 9AF.

Recipients of this communication should note that electronic communication, 
whether by email, website, SWIFT or otherwise, is an unsafe method of 
communication. Emails and SWIFT messages may be lost, delivered to the wrong 
address, intercepted or affected by delays, interference by third parties or 
viruses and their confidentiality, security and integrity cannot be guaranteed. 
None of SIGPL or any of its affiliates bear any liability or responsibility 
therefor.

Please see the important information at 
www.systematica.com/disclaimer.

Please see the important information, including regarding the processing of 
personal data by Systematica, at www.systematica.com/PrivacyNotice.

www.systematica.com


Slowness using GenericWriteAheadSink

2022-03-14 Thread James Sandys-Lumsdaine
Hello,

We are using the GenericWriteAheadSink to buffer up values to then send to a 
SQL Server database with a fast bulk copy upload. However, when I watch my 
process running it seems to be a huge amount of time iterating the Iterable 
provided to the sendValues() method. It takes such a long time I’ve had to 
increase the checkpoint timeout because it causes the whole workflow to suspend.

I am using Flink 1.14.0 and have attached a simple, self-contained example. If 
I was to guess then there is a very large deserialization overhead from the 
checkpointed data even though I’m currently using a HashMapStateBackend. I have 
profiled the application and it definitely seems to spend most of its time 
there. The object involved is just a plain POJO.

A second “issue” is that I am forced to clone the objects provided by the 
iterator – when I dug into the code I could see a 
ReusingMutableToRegularIteratorWrapper class being using and the objects passed 
were being reused between 2 objects. I don’t know the reasoning behind this 
(except to prevent extra garbage?) but it would be nice if I could specify a 
“non-reusing” one otherwise there is a deserialization AND a clone for every 
object in the list.

Any pointers or advice on a better way to send large amounts of data to a SQL 
Server sink would be appreciated.

James.


SlowBulkCopySinkMain.java
Description: SlowBulkCopySinkMain.java


Basic questions about resuming stateful Flink jobs

2022-02-16 Thread James Sandys-Lumsdaine
Hi all,

I have a 1.14 Flink streaming workflow with many stateful functions that has a 
FsStateBackend and checkpointed enabled, although I haven't set a location for 
the checkpointed state.

I've really struggled to understand how I can stop my Flink job and restart it 
and ensure it carries off exactly where is left off by using the state or 
checkpoints or savepoints. This is not clearly explained in the book or the web 
documentation.

Since I have no control over my Flink job id I assume I can not force Flink to 
pick up the state recorded under the jobId directory for the FsStateBackend. 
Therefore I think​ Flink should read back in the last checkpointed data but I 
don't understand how to force my program to read this in? Do I use retained 
checkpoints or not? How can I force my program either use the last checkpointed 
state (e.g. when running from my IDE, starting and stopping the program) or 
maybe force it not to read in the state and start completely fresh?

The web documentation talks about bin/flink but I am running from my IDE so I 
want my Java code to control this progress using the Flink API in Java.

Can anyone give me some basic pointers as I'm obviously missing something 
fundamental on how to allow my program to be stopped and started without losing 
all the state.

Many thanks,

James.



Re: Unit test harness for Sources

2022-02-15 Thread James Sandys-Lumsdaine
Thanks for the reply. If I upgrade my legacy Sources to use the new split 
Sources is there a better unit test harness for that?

Thanks,

James.

Sent from my iPhone

On 15 Feb 2022, at 13:24, Chesnay Schepler  wrote:


I don't think there is anything of the sort for the legacy sources. I would 
suggest to follow the example at 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/testing/#testing-flink-jobs
 and using a job that only contains the source (+ something to either extract 
the results or verify it within the job).

On 14/02/2022 18:06, James Sandys-Lumsdaine wrote:
Hi all,

I've been using the test harness classes to unit test my stateful 1 and 2 
stream functions. But I also have some stateful legacy Source classes I would 
like to unit test and can't find any documentation or example for that - is 
this possible?

Thanks,

James.



Unit test harness for Sources

2022-02-14 Thread James Sandys-Lumsdaine
Hi all,

I've been using the test harness classes to unit test my stateful 1 and 2 
stream functions. But I also have some stateful legacy Source classes I would 
like to unit test and can't find any documentation or example for that - is 
this possible?

Thanks,

James.


Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread James Sandys-Lumsdaine
Hello again,

We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it would 
solve our issue with checkpointing with finished data sources. We need the 
checkpointing to work to trigger Flink's GenericWriteAheadSink class.

Firstly, the constant mentioned on FLIP-147 that enables the feature isn't 
available as far as we can see (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's 
not in ConfigConstants or CheckpointConfig for example. So instead we enabled 
with the following:

conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
 true);
StreamExecutionEnvironment env = StreamExecutionEnvironment 
.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(30 * 1000);
...

We can see the constant available in 1.15 on Google but not the version we were 
expecting (1.14.0).

Previously we had to have long Thread.sleep(x) in to keep the sources alive 
when checkpoints were taken. When we enable this feature using the explicit 
string and removed these hacks we start seeing these errors:


INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph Source: 
Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785) switched from 
RUNNING to FINISHED.



[some lines removed for brevity]



INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.c.CheckpointCoordinator 
Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d failed due to 
{}

org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task e015c4f0910fb27e15fec063616ab785. Failure 
reason: Task local checkpoint failure.

 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
 ~[flink-runtime-1.14.0.jar:1.14.0]

 at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) ~[na:na]

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.8.0_91]

 at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
 ~[na:na]

 at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 ~[na:na]

 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[na:na]

 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[na:na]

 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[na:na]

 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[scala-library-2.11.12.jar:na]

 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[na:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

 at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]

 at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]

 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]

 at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]

 at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]

 at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]

 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[na:1.8.0_91]

 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[na:1.8.0_91]

 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[na:1.8.0_91]

 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
~[na:1.8.0_91]

FYI, if we don't enable this feature we see a different error consistent with 
the older version of Flink:

INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator   Failed to 
trigger checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some tasks 
of the job have already finished and checkpointing with finished tasks is not 
enabled. Failure reason: Not all required tasks are currently running.

Can anyone advise if this feature is indeed available and working in 1.14.0 and 
how to correctly enable?

Thanks,

James.


From: Austin Cawley-Edwards 
Sent: 04 November 2021 18:46
To: James Sandys-Lumsdaine

GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-03 Thread James Sandys-Lumsdaine
Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);

MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
UUID.randomUUID().toString()));


env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink 
{
public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}

@Override
protected boolean sendValues(Iterable objects, long checkpointId, 
long timestamp) {
logger.info("Sending 
{},{}---", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.



Re: Empty Kafka topics and watermarks

2021-10-11 Thread James Sandys-Lumsdaine
Ah thanks for the feedback. I can work around for now but will upgrade as soon 
as I can to the latest version.

Thanks very much,

James.

From: Piotr Nowojski 
Sent: 08 October 2021 13:17
To: James Sandys-Lumsdaine 
Cc: user@flink.apache.org 
Subject: Re: Empty Kafka topics and watermarks

Hi James,

I believe you have encountered a bug that we have already fixed [1]. The small 
problem is that in order to fix this bug, we had to change some 
`@PublicEvolving` interfaces and thus we were not able to backport this fix to 
earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> napisał(a):
Hi everyone,

I'm putting together a Flink workflow that needs to merge historic data from a 
custom JDBC source with a Kafka flow (for the realtime data). I have 
successfully written the custom JDBC source that emits a watermark for the last 
event time after all the DB data has been emitted but now I face a problem when 
joining with data from the Kafka stream.

I register a timer in my KeyedCoProcessFunction joining the DB stream with live 
Kafka stream so I can emit all the "batch" data from the DB in one go when 
completely read up to the watermark but the timer never fires as the Kafka 
stream is empty and therefore doesn't emit a watermark. My Kafka stream will 
allowed to be empty since all the data will have been retrieved from the DB 
call so I only expect new events to appear over Kafka. Note that if I replace 
the Kafka input with a simple env.fromCollection(...) empty list then the timer 
triggers fine as Flink seems to detect it doesn't need to wait for any input 
from stream 2. So it seems to be something related to the Kafka stream status 
that is preventing the watermark from advancing in the KeyedCoProcessFunction.

I have tried configuring the Kafka stream timestamp and watermark strategies to 
so the source is marked as idle after 10 seconds but still it seems the 
watermark in the join operator combining these 2 streams is not advancing. (See 
example code below).

Maybe this is my bad understanding but I thought if an input stream into a 
KeyedCoProcessFunction is idle then it wouldn't be considered by the operator 
for forwarding the watermark i.e. it would forward the non-idle input stream's 
watermark and not do a min(stream1WM, stream2WM). With the below example I 
never see the onTimer fire and the only effect the withIdleness() strategy has 
is to stop the print statements in onPeriodicEmit() happening after 5 seconds 
(env periodic emit is set to the default 200ms so I see 25 rows before it 
stops).

The only way I can get my KeyedCoProcessFunction timer to fire is to force an 
emit of the watermark I want in the onPeriodicEmit() after x numbers of 
attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 
times and the "latestWatermark" is still Long.MIN_VALUE then I emit the 
watermark I want so the join can progress. This seems like a nasty hack to me 
but perhaps something like this is actually necessary?

I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers 
would be appreciated.

Thanks in advance,

James.


FlinkKafkaConsumer positionsFlinkKafkaConsumer = new 
FlinkKafkaConsumer<>("poc.positions", 
ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, 
SchemaRegistryURL), kafkaConsumerProperties);

positionsFlinkKafkaConsumer.setStartFromEarliest();

positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(

   new WatermarkStrategy() {

  @Override

  public TimestampAssigner 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (event, recordTimestamp) -> {

return event.getPhysicalFrom();

};

}



  @Override

  public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

public long latestWatermark = Long.MIN_VALUE;



@Override

public void onEvent(Position event, long timestamp, 
WatermarkOutput output) {

long eventWatermark = event.getPhysicalFrom();

if (eventWatermark > latestWatermark)

latestWatermark = eventWatermark;

}



@Override

public void onPeriodicEmit(WatermarkOutput output) {

System.out.printf("Emitting watermark %d\n", 
latestWatermark);

 

Empty Kafka topics and watermarks

2021-10-08 Thread James Sandys-Lumsdaine
Hi everyone,

I'm putting together a Flink workflow that needs to merge historic data from a 
custom JDBC source with a Kafka flow (for the realtime data). I have 
successfully written the custom JDBC source that emits a watermark for the last 
event time after all the DB data has been emitted but now I face a problem when 
joining with data from the Kafka stream.

I register a timer in my KeyedCoProcessFunction joining the DB stream with live 
Kafka stream so I can emit all the "batch" data from the DB in one go when 
completely read up to the watermark but the timer never fires as the Kafka 
stream is empty and therefore doesn't emit a watermark. My Kafka stream will 
allowed to be empty since all the data will have been retrieved from the DB 
call so I only expect new events to appear over Kafka. Note that if I replace 
the Kafka input with a simple env.fromCollection(...) empty list then the timer 
triggers fine as Flink seems to detect it doesn't need to wait for any input 
from stream 2. So it seems to be something related to the Kafka stream status 
that is preventing the watermark from advancing in the KeyedCoProcessFunction.

I have tried configuring the Kafka stream timestamp and watermark strategies to 
so the source is marked as idle after 10 seconds but still it seems the 
watermark in the join operator combining these 2 streams is not advancing. (See 
example code below).

Maybe this is my bad understanding but I thought if an input stream into a 
KeyedCoProcessFunction is idle then it wouldn't be considered by the operator 
for forwarding the watermark i.e. it would forward the non-idle input stream's 
watermark and not do a min(stream1WM, stream2WM). With the below example I 
never see the onTimer fire and the only effect the withIdleness() strategy has 
is to stop the print statements in onPeriodicEmit() happening after 5 seconds 
(env periodic emit is set to the default 200ms so I see 25 rows before it 
stops).

The only way I can get my KeyedCoProcessFunction timer to fire is to force an 
emit of the watermark I want in the onPeriodicEmit() after x numbers of 
attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 
times and the "latestWatermark" is still Long.MIN_VALUE then I emit the 
watermark I want so the join can progress. This seems like a nasty hack to me 
but perhaps something like this is actually necessary?

I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers 
would be appreciated.

Thanks in advance,

James.


FlinkKafkaConsumer positionsFlinkKafkaConsumer = new 
FlinkKafkaConsumer<>("poc.positions", 
ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, 
SchemaRegistryURL), kafkaConsumerProperties);

positionsFlinkKafkaConsumer.setStartFromEarliest();

positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(

   new WatermarkStrategy() {

  @Override

  public TimestampAssigner 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (event, recordTimestamp) -> {

return event.getPhysicalFrom();

};

}



  @Override

  public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

public long latestWatermark = Long.MIN_VALUE;



@Override

public void onEvent(Position event, long timestamp, 
WatermarkOutput output) {

long eventWatermark = event.getPhysicalFrom();

if (eventWatermark > latestWatermark)

latestWatermark = eventWatermark;

}



@Override

public void onPeriodicEmit(WatermarkOutput output) {

System.out.printf("Emitting watermark %d\n", 
latestWatermark);

output.emitWatermark(new 
Watermark(latestWatermark));

}

};

}

}.withIdleness(Duration.ofSeconds(5)));



DataStream positionKafkaInputStream = 
env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source");



DataStream otherPositionStream = 
env.fromCollection(Lists.newArrayList(new Position(..., 
timestamp.getMillis())), TypeInformation.of(Position.class));

otherPositionStream.assignTimestampsAndWatermarks(

WatermarkStrategy


.forBoundedOutOfOrderness(Duration.ofSeconds(10))

.withTimestampAssigner((e, t) -> e.getPhysicalFrom()));



KeyedStream keyedPositionKafkaInputStream = 
positionKafkaInputStream.keyBy(p -> p.getMarketName());

KeyedStream keyedOtherPositionStream = 
otherPositionStream.keyBy(p -> 

Broadcast data to all keyed streams

2021-09-06 Thread James Sandys-Lumsdaine
Hello,

I have a Flink workflow which is partitioned on a key common to all the stream 
objects and a key that is best suited to the high volume of data I am 
processing. I now want to add in a new stream of prices that I want to make 
available to all partitioned streams - however, this new stream of prices does 
not have this common keyBy value.

I have tried writing a piece of code using then broadcast() method (no args) to 
get this new price stream to be sent to all the parallel instances on an 
operator. The code looks like this:

KeyedStream keyedRefDataStream = ;

DataStream prices = ;
DataStream broadcastPrices = prices.broadcast();

keyedRefDataStream
.connect(broadcastPrices)
.process(new RefDataPriceJoiner()); // implements KeyedCoProcessFunction

I then get an error saying the broadcastPrices stream must be keyed - but I 
can't key it on the same key as the refData stream because it lacks this field.

I could reshuffle all my data by re-keying the ref data on a different field 
but this will cause a huge amount of data to be sent over the network compared 
with me being able to broadcast this much smaller amount of data to my keyed 
streams. Note I am assuming this isn't a "broadcast state" example - I assume 
the broadcast() method allows me to send data to all partitions.

Is any of this possible? Any pointers for me would be very helpful as I can't 
find answer on the web or in the documentation.

Many thanks,

James.


Questions on reading JDBC data with Flink Streaming API

2021-08-10 Thread James Sandys-Lumsdaine
Hello,



I'm starting a new Flink application to allow my company to perform lots of 
reporting. We have an existing legacy system with most the data we need held in 
SQL Server databases. We will need to consume data from these databases 
initially before starting to consume more data from newly deployed Kafka 
streams.



I've spent a lot of time reading the Flink book and web pages but I have some 
simple questions and assumptions I hope you can help with so I can progress.



Firstly, I am wanting to use the DataStream API so we can both consume historic 
data and also realtime data. I do not think I want to use the DataSet API but I 
also don't see the point in using the SQL/Table apis as I would prefer to write 
my functions in Java classes. I need to maintain my own state and it seems 
DataStream keyed functions are the way to go.



Now I am trying to actually write code against our production databases I need 
to be able to read in "streams" of data with SQL queries - there does not 
appear to be a JDBC source connector so I think I have to make the JDBC call 
myself and then possibly create a DataSource using env.fromElements(). 
Obviously this is a "bounded" data set but how else am I meant to get historic 
data loaded in? In the future I want to include a Kafka stream as well which 
will only have a few weeks worth of data so I imagine I will sometimes need to 
merge data from a SQL Server/Snowflake database with a live stream from a Kafka 
stream. What is the best practice for this as I don't see examples discussing 
this.



With retrieving data from a JDBC source, I have also seen some examples using a 
StreamingTableEnvironment - am I meant to use this somehow instead to query 
data from a JDBC connection into my DataStream functions etc? Again, I want to 
write my functions in Java not some Flink SQL. Is it best practice to use a 
StreamingTableEnvironment to query JDBC data if I'm only using the DataStream 
API?



Thanks in advance - I'm sure I will have plenty more high-level questions like 
this.