Re: Flink Table API watermark after a select operation on a table
Hi Eugenio, According to docs[1], there are two ways to define the watermark in a table: 1. Defining in DDL 2. During DataStream-to-Table Conversion In your case, I think could use CREATE TABLE DDL to create a new table from filteredPhasesDurationsTable with watermark. See more in CREATE Statement docs[2]. [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/ Best, Xiangyu Eugenio Marotti 于2023年6月26日周一 13:03写道: > Hi, > > thanks you for the suggestion. The problem is that data_fine in my case > can be null, so I wanted to first filter by $("fase_completata").isTrue() > (when fase_completata is true data_fine is not null) and then define the > watermark. Is it possible to define it after the last select? > > Best, > Eugenio > > Il giorno 26 giu 2023, alle ore 05:31, feng xiangyu > ha scritto: > > Hi, Eugenio > > AFAIK, you could define watermark on the data_fine by adding attribute in > phasesDurationsSchema. > For example: > > final Schema phasesDurationsSchema = Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase", DataTypes.STRING().notNull()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .watermark("data_fine", "data_fine - INTERVAL '1' SECOND") > .primaryKey("id_fascicolo", "fase") > .build(); > > See more in Table API and DataStream API integration docs[1]. > > Best, > Xiangyu > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/ > > Eugenio Marotti 于2023年6月25日周日 15:35写道: > >> Hi everyone, >> >> I'm using Flink for processing some streaming data. First of all I have >> two tables receiving events from Kafka. These tables are joined and the >> resulting table is converted to a DataStream where it is processed by a >> custom KeyedProcessFunction. The output is then converted to a table and >> sent to Opensearch. Here’s the code I’m using: >> >> >> final TableDescriptor legalFilesTableDescriptor = TableDescriptor. >> forConnector("kafka") >> .schema(Schema.newBuilder() >> .column("id", DataTypes.BIGINT()) >> .column("nrg", DataTypes.STRING()) >> .column("ufficio", DataTypes.STRING()) >> .column("sezione", DataTypes.STRING()) >> .column("giudice", DataTypes.STRING()) >> .column("oggetto", DataTypes.STRING()) >> .column("codice_oggetto", DataTypes.STRING()) >> .build()) >> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli")) >> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) >> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. >> ScanStartupMode.LATEST_OFFSET) >> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") >> .build(); >> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor); >> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”); >> >> >> final TableDescriptor eventsTableDescriptor = TableDescriptor. >> forConnector("kafka") >> .schema(Schema.newBuilder() >> .column("id", DataTypes.BIGINT()) >> .column("data", DataTypes.BIGINT()) >> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)") >> .column("evento", DataTypes.STRING()) >> .column("id_fascicolo", DataTypes.BIGINT()) >> .build()) >> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi")) >> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) >> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. >> ScanStartupMode.LATEST_OFFSET) >> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") >> .build(); >> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor); >> >> >> Table legalFileEventsTable = legalFilesTable.join(e
Re: Flink Table API watermark after a select operation on a table
Hi, Eugenio AFAIK, you could define watermark on the data_fine by adding attribute in phasesDurationsSchema. For example: final Schema phasesDurationsSchema = Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT().notNull()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase", DataTypes.STRING().notNull()) .column("fase_completata", DataTypes.BOOLEAN()) .column("durata", DataTypes.BIGINT()) .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) .watermark("data_fine", "data_fine - INTERVAL '1' SECOND") .primaryKey("id_fascicolo", "fase") .build(); See more in Table API and DataStream API integration docs[1]. Best, Xiangyu [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/ Eugenio Marotti 于2023年6月25日周日 15:35写道: > Hi everyone, > > I'm using Flink for processing some streaming data. First of all I have > two tables receiving events from Kafka. These tables are joined and the > resulting table is converted to a DataStream where it is processed by a > custom KeyedProcessFunction. The output is then converted to a table and > sent to Opensearch. Here’s the code I’m using: > > > final TableDescriptor legalFilesTableDescriptor = TableDescriptor. > forConnector("kafka") > .schema(Schema.newBuilder() > .column("id", DataTypes.BIGINT()) > .column("nrg", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .build()) > .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. > ScanStartupMode.LATEST_OFFSET) > .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") > .build(); > tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor); > Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”); > > > final TableDescriptor eventsTableDescriptor = TableDescriptor.forConnector > ("kafka") > .schema(Schema.newBuilder() > .column("id", DataTypes.BIGINT()) > .column("data", DataTypes.BIGINT()) > .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)") > .column("evento", DataTypes.STRING()) > .column("id_fascicolo", DataTypes.BIGINT()) > .build()) > .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. > ScanStartupMode.LATEST_OFFSET) > .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") > .build(); > tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor); > > > Table legalFileEventsTable = legalFilesTable.join(eventsTable) > .where($("id").isEqual($("id_fascicolo"))) > .select( > $("id").as("id_fascicolo"), > $("id_evento"), > $("giudice"), > $("nrg"), > $("codice_oggetto"), > $("oggetto"), > $("ufficio"), > $("sezione"), > $("data_evento"), > $("evento") > ); > > > DataStream phasesDurationsDataStream = tEnv.toChangelogStream( > legalFileEventsTable) > .keyBy(r -> r.getFieldAs("id_fascicolo")) > .process(new PhaseDurationCounterProcessFunction()) > .returns(new RowTypeInfo( > new TypeInformation[] { > BasicTypeInfo.LONG_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.BOOLEAN_TYPE_INFO, > BasicTypeInfo.LONG_TYPE_INFO, > BasicTypeInfo.INSTANT_TYPE_INFO, > BasicTypeInfo.INSTANT_TYPE_INFO > }, > new String[] { "id_fascicolo", "nrg", "giudice", "oggetto", > "codice_oggetto", > "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio", > "data_fine" } > )); > > final Schema phasesDurationsSchema = Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase", DataTypes.STRING().notNull()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .primaryKey("id_fascicolo", "fase") > .build(); > > Table phasesDurationsTable = tEnv.fromChangelogStream( > phasesDurationsDataStream, phasesDurationsSchema, > ChangelogMode.upsert()); > > final TableDescriptor phasesDurat
Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)
Congrats Yu! Looking forward to contributing to Paimon! Best Regards, Xiangyu yuxia 于2023年3月27日周一 21:01写道: > congratulations! > > Best regards, > Yuxia > > > 发件人: "Andrew Otto" > 收件人: "Matthias Pohl" > 抄送: "Jing Ge" , "Leonard Xu" , "Yu > Li" , "dev" , "User" < > user@flink.apache.org>, "user-zh" > 发送时间: 星期一, 2023年 3 月 27日 下午 8:57:50 > 主题: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache > Paimon(incubating) > > Exciting! > > If this ends up working well, Wikimedia Foundation would love to try it > out! > > On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user < [ mailto: > user@flink.apache.org | user@flink.apache.org ] > wrote: > > > > Congratulations and good luck with pushing the project forward. > > On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user < [ mailto: > user@flink.apache.org | user@flink.apache.org ] > wrote: > > BQ_BEGIN > > Congrats! > Best regards, > Jing > > On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu < [ mailto:xbjt...@gmail.com | > xbjt...@gmail.com ] > wrote: > > BQ_BEGIN > > Congratulations! > > Best, > Leonard > > > BQ_BEGIN > > On Mar 27, 2023, at 5:23 PM, Yu Li < [ mailto:car...@gmail.com | > car...@gmail.com ] > wrote: > > Dear Flinkers, > > > > > As you may have noticed, we are pleased to announce that Flink Table Store > has joined the Apache Incubator as a separate project called Apache > Paimon(incubating) [1] [2] [3]. The new project still aims at building a > streaming data lake platform for high-speed data ingestion, change data > tracking and efficient real-time analytics, with the vision of supporting a > larger ecosystem and establishing a vibrant and neutral open source > community. > > > > > We would like to thank everyone for their great support and efforts for > the Flink Table Store project, and warmly welcome everyone to join the > development and activities of the new project. Apache Flink will continue > to be one of the first-class citizens supported by Paimon, and we believe > that the Flink and Paimon communities will maintain close cooperation. > > > > > 亲爱的Flinkers, > > > > > 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2] > [3]。新项目的名字是 Apache > Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。 > > > > > > 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink > 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。 > > > > > Best Regards, > Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC) > > 致礼, > 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC) > > > > > [1] [ https://paimon.apache.org/ | https://paimon.apache.org/ ] > [2] [ https://github.com/apache/incubator-paimon | > https://github.com/apache/incubator-paimon ] > [3] [ https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal > | https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal ] > > > > > > BQ_END > > > BQ_END > > > BQ_END > > >