Flink 1.18 support for flink stateful functions

2024-04-12 Thread Deshpande, Omkar via user
Hello,

Is there a plan to add support for flink 1.18 in flink stateful function?
Also, generally the stateful functions seem to be slow and lag behind the flink 
release cycle. Is the stateful function project going to be actively maintained?

Thanks,
Omkar


Pyflink w Nessie and Iceberg in S3 Jars

2024-04-12 Thread Robert Prat
Hi there,

For several days I have been trying to find the right configuration for my 
pipeline which roughly consists in the following schema 
RabbitMQ->PyFlink->Nessie/Iceberg/S3.

For what I am going to explain I have tried both locally and through the 
official Flink docker images.

I have tried several different flink versions, but for simplicity let's say I 
am using the apache-flink==1.18.0 version. So far I have been able to use the 
jar in org/apache/iceberg/iceberg-flink-runtime-1.18 to connect to RabbitMQ and 
obtain the data from some streams, so I'd say the source side is working.

After that I have been trying to find a way to send the data in those streams 
to Iceberg in S3 through Nessie Catalog which is the one I have working. I have 
been using this pipeline with both Spark and Trino for some time now so I know 
it is working. Now what I am "simply" trying to do is to use my already set up 
Nessie catalog through flink.

I have tried to connect both directly through the sql-client.sh in the bin of 
pyflink dir and through python as
table_env.execute_sql(f"""
CREATE CATALOG nessie WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'uri'='http://mynessieip:mynessieport/api/v1',
'ref'='main',
'warehouse'='s3a://mys3warehouse',
's3-access-key-id'='{USER_AWS}',
's3-secret-access-key'='{KEY_AWS}',
's3-path-style-access'='true',
'client-region'='eu-west-1')""")

The Jars I have included  (One of the many combinations I've tried with no 
result) in my  pyflink/lib dir  (i also tried to add them with env.add_jars or 
--jarfile)   are:

  *   hadoop-aws-3.4.0.jar
  *
iceberg-flink-runtime-1.18-1.5.0.jar
  *
hadoop-client-3.4.0.jar
  *
hadoop-common-3.4.0.jar
  *
hadoop-hdfs-3.4.0.jar

Right now I am getting the following error message:

 py4j.protocol.Py4JJavaError: An error occurred while calling o56.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/HdfsConfiguration (...)
Caused by: java.lang.ClassNotFoundException: 
org.apache.hadoop.hdfs.HdfsConfiguration...


But I have gotten several different errors in all the different Jar 
combinations I have tried. So my request is, does anybody know if my problem is 
JAR related or if I am doing something else wrong? I would be immensely 
grateful if someone could guide me to the right steps to implement this 
pipeline.

Thanks:)




Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Sachin Mittal
Hi Yunfeng,
I have a question around the tolerance for out of order bound watermarking,

What I understand that when consuming from source with out of order bound
set as B, lets say it gets a record with timestamp T.
After that it will drop all the subsequent records which arrive with the
timestamp less than T - B.

Please let me know if I understood this correctly.

If this is correct, then how does allowed lateness when performing event
time windowing works ?  Say allowed lateness is set as A,
does this mean that value of A should be less than that of B because
records with timestamp less than T - B would have already been dropped at
the source.

If this is not the case than how does lateness work with our of order
boundedness ?

Thanks
Sachin


On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou 
wrote:

> Hi Sachin,
>
> 1. When your Flink job performs an operation like map or flatmap, the
> output records would be automatically assigned with the same timestamp
> as the input record. You don't need to manually assign the timestamp
> in each step. So the windowing result in your example should be as you
> have expected.
>
> 2. The frequency of watermarks can be configured by
> pipeline.auto-watermark-interval in flink-conf.yaml, or
> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> the event time related to the Watermark is still T, just that the job
> will tolerate any records whose timestamp is in range [T-B, T].
>
> Best,
> Yunfeng
>
> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
> >
> > Hello folks,
> > I have few questions:
> >
> > Say I have a source like this:
> >
> > final DataStream data =
> > env.fromSource(
> > source,
> >
>  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> > .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >
> >
> > My pipeline after this is as followed:
> >
> > data.flatMap(new MyFlattendData())
> > .keyBy(new MyKeySelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> > .reduce(new MyReducer());
> >
> >
> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> > Example say I have timestamped data from source as:
> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >
> >  would this get flattened to say:
> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >
> > then keyed to say:
> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
> flatdata4]),...]
> >
> > windows:
> > 1st => [ flatdata1, flatdata2 ]
> > 2nd => [ flatdata4, ... ]
> >
> > Would the windows created before the reduce function be applied be like
> I have illustrated or to have it this way, do I need to output a record at
> each step with the timestamp assigned for that record ?
> >
> > Basically is the timestamp assigned when reading from the source pushed
> (retained) down to all the steps below when doing event time window
> operation ?
> >
> >
> > Next question is in my watermark strategy: how do I set the period of
> the watermarking.
> > Basically from An out-of-order bound B means that once an event with
> timestamp T was encountered, no events older than T - B will follow any
> more when the watermarking is done.
> >
> > However, how frequently is watermarking done and when say watermarking,
> the last encountered event was with timestamp T , does this mean watermark
> timestamp would be T - B ?
> >
> > How can we control the watermarking period ?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Yunfeng Zhou
Hi Sachin,

1. When your Flink job performs an operation like map or flatmap, the
output records would be automatically assigned with the same timestamp
as the input record. You don't need to manually assign the timestamp
in each step. So the windowing result in your example should be as you
have expected.

2. The frequency of watermarks can be configured by
pipeline.auto-watermark-interval in flink-conf.yaml, or
ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
the event time related to the Watermark is still T, just that the job
will tolerate any records whose timestamp is in range [T-B, T].

Best,
Yunfeng

On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
>
> Hello folks,
> I have few questions:
>
> Say I have a source like this:
>
> final DataStream data =
> env.fromSource(
> source,
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((event, timestamp) -> event.timestamp));
>
>
> My pipeline after this is as followed:
>
> data.flatMap(new MyFlattendData())
> .keyBy(new MyKeySelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .reduce(new MyReducer());
>
>
> First question I have is that the timestamp I assign from the source, would 
> it get carried to all steps below to my window ?
> Example say I have timestamped data from source as:
> => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
>
>  would this get flattened to say:
> => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
>
> then keyed to say:
> => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1, 
> flatdata4]),...]
>
> windows:
> 1st => [ flatdata1, flatdata2 ]
> 2nd => [ flatdata4, ... ]
>
> Would the windows created before the reduce function be applied be like I 
> have illustrated or to have it this way, do I need to output a record at each 
> step with the timestamp assigned for that record ?
>
> Basically is the timestamp assigned when reading from the source pushed 
> (retained) down to all the steps below when doing event time window operation 
> ?
>
>
> Next question is in my watermark strategy: how do I set the period of the 
> watermarking.
> Basically from An out-of-order bound B means that once an event with 
> timestamp T was encountered, no events older than T - B will follow any more 
> when the watermarking is done.
>
> However, how frequently is watermarking done and when say watermarking, the 
> last encountered event was with timestamp T , does this mean watermark 
> timestamp would be T - B ?
>
> How can we control the watermarking period ?
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>