Flink 1.18 support for flink stateful functions
Hello, Is there a plan to add support for flink 1.18 in flink stateful function? Also, generally the stateful functions seem to be slow and lag behind the flink release cycle. Is the stateful function project going to be actively maintained? Thanks, Omkar
Pyflink w Nessie and Iceberg in S3 Jars
Hi there, For several days I have been trying to find the right configuration for my pipeline which roughly consists in the following schema RabbitMQ->PyFlink->Nessie/Iceberg/S3. For what I am going to explain I have tried both locally and through the official Flink docker images. I have tried several different flink versions, but for simplicity let's say I am using the apache-flink==1.18.0 version. So far I have been able to use the jar in org/apache/iceberg/iceberg-flink-runtime-1.18 to connect to RabbitMQ and obtain the data from some streams, so I'd say the source side is working. After that I have been trying to find a way to send the data in those streams to Iceberg in S3 through Nessie Catalog which is the one I have working. I have been using this pipeline with both Spark and Trino for some time now so I know it is working. Now what I am "simply" trying to do is to use my already set up Nessie catalog through flink. I have tried to connect both directly through the sql-client.sh in the bin of pyflink dir and through python as table_env.execute_sql(f""" CREATE CATALOG nessie WITH ( 'type'='iceberg', 'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'uri'='http://mynessieip:mynessieport/api/v1', 'ref'='main', 'warehouse'='s3a://mys3warehouse', 's3-access-key-id'='{USER_AWS}', 's3-secret-access-key'='{KEY_AWS}', 's3-path-style-access'='true', 'client-region'='eu-west-1')""") The Jars I have included (One of the many combinations I've tried with no result) in my pyflink/lib dir (i also tried to add them with env.add_jars or --jarfile) are: * hadoop-aws-3.4.0.jar * iceberg-flink-runtime-1.18-1.5.0.jar * hadoop-client-3.4.0.jar * hadoop-common-3.4.0.jar * hadoop-hdfs-3.4.0.jar Right now I am getting the following error message: py4j.protocol.Py4JJavaError: An error occurred while calling o56.executeSql. : java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/HdfsConfiguration (...) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.HdfsConfiguration... But I have gotten several different errors in all the different Jar combinations I have tried. So my request is, does anybody know if my problem is JAR related or if I am doing something else wrong? I would be immensely grateful if someone could guide me to the right steps to implement this pipeline. Thanks:)
Re: Understanding event time wrt watermarking strategy in flink
Hi Yunfeng, I have a question around the tolerance for out of order bound watermarking, What I understand that when consuming from source with out of order bound set as B, lets say it gets a record with timestamp T. After that it will drop all the subsequent records which arrive with the timestamp less than T - B. Please let me know if I understood this correctly. If this is correct, then how does allowed lateness when performing event time windowing works ? Say allowed lateness is set as A, does this mean that value of A should be less than that of B because records with timestamp less than T - B would have already been dropped at the source. If this is not the case than how does lateness work with our of order boundedness ? Thanks Sachin On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou wrote: > Hi Sachin, > > 1. When your Flink job performs an operation like map or flatmap, the > output records would be automatically assigned with the same timestamp > as the input record. You don't need to manually assign the timestamp > in each step. So the windowing result in your example should be as you > have expected. > > 2. The frequency of watermarks can be configured by > pipeline.auto-watermark-interval in flink-conf.yaml, or > ExecutionConfig#setAutoWatermarkInterval in Java API. In your example, > the event time related to the Watermark is still T, just that the job > will tolerate any records whose timestamp is in range [T-B, T]. > > Best, > Yunfeng > > On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal wrote: > > > > Hello folks, > > I have few questions: > > > > Say I have a source like this: > > > > final DataStream data = > > env.fromSource( > > source, > > > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) > > .withTimestampAssigner((event, timestamp) -> > event.timestamp)); > > > > > > My pipeline after this is as followed: > > > > data.flatMap(new MyFlattendData()) > > .keyBy(new MyKeySelector()) > > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > > .reduce(new MyReducer()); > > > > > > First question I have is that the timestamp I assign from the source, > would it get carried to all steps below to my window ? > > Example say I have timestamped data from source as: > > => [ (10, data1), (12, data2), (59, data3), (61, data4), ... ] > > > > would this get flattened to say: > > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...] > > > > then keyed to say: > > => [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, [key1, > flatdata4]),...] > > > > windows: > > 1st => [ flatdata1, flatdata2 ] > > 2nd => [ flatdata4, ... ] > > > > Would the windows created before the reduce function be applied be like > I have illustrated or to have it this way, do I need to output a record at > each step with the timestamp assigned for that record ? > > > > Basically is the timestamp assigned when reading from the source pushed > (retained) down to all the steps below when doing event time window > operation ? > > > > > > Next question is in my watermark strategy: how do I set the period of > the watermarking. > > Basically from An out-of-order bound B means that once an event with > timestamp T was encountered, no events older than T - B will follow any > more when the watermarking is done. > > > > However, how frequently is watermarking done and when say watermarking, > the last encountered event was with timestamp T , does this mean watermark > timestamp would be T - B ? > > > > How can we control the watermarking period ? > > > > Thanks > > Sachin > > > > > > > > > > > > > > > > >
Re: Understanding event time wrt watermarking strategy in flink
Hi Sachin, 1. When your Flink job performs an operation like map or flatmap, the output records would be automatically assigned with the same timestamp as the input record. You don't need to manually assign the timestamp in each step. So the windowing result in your example should be as you have expected. 2. The frequency of watermarks can be configured by pipeline.auto-watermark-interval in flink-conf.yaml, or ExecutionConfig#setAutoWatermarkInterval in Java API. In your example, the event time related to the Watermark is still T, just that the job will tolerate any records whose timestamp is in range [T-B, T]. Best, Yunfeng On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal wrote: > > Hello folks, > I have few questions: > > Say I have a source like this: > > final DataStream data = > env.fromSource( > source, > > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) > .withTimestampAssigner((event, timestamp) -> event.timestamp)); > > > My pipeline after this is as followed: > > data.flatMap(new MyFlattendData()) > .keyBy(new MyKeySelector()) > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > .reduce(new MyReducer()); > > > First question I have is that the timestamp I assign from the source, would > it get carried to all steps below to my window ? > Example say I have timestamped data from source as: > => [ (10, data1), (12, data2), (59, data3), (61, data4), ... ] > > would this get flattened to say: > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...] > > then keyed to say: > => [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, [key1, > flatdata4]),...] > > windows: > 1st => [ flatdata1, flatdata2 ] > 2nd => [ flatdata4, ... ] > > Would the windows created before the reduce function be applied be like I > have illustrated or to have it this way, do I need to output a record at each > step with the timestamp assigned for that record ? > > Basically is the timestamp assigned when reading from the source pushed > (retained) down to all the steps below when doing event time window operation > ? > > > Next question is in my watermark strategy: how do I set the period of the > watermarking. > Basically from An out-of-order bound B means that once an event with > timestamp T was encountered, no events older than T - B will follow any more > when the watermarking is done. > > However, how frequently is watermarking done and when say watermarking, the > last encountered event was with timestamp T , does this mean watermark > timestamp would be T - B ? > > How can we control the watermarking period ? > > Thanks > Sachin > > > > > > > >