Re: Flink Table API watermark after a select operation on a table

2023-06-25 Thread feng xiangyu
Hi Eugenio,

According to docs[1], there are two ways to define the watermark in a table:
1. Defining in DDL
2. During DataStream-to-Table Conversion

In your case, I think could use CREATE TABLE DDL to create a new table from
filteredPhasesDurationsTable with watermark.
See more in CREATE Statement docs[2].


[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/

Best,
Xiangyu

Eugenio Marotti  于2023年6月26日周一 13:03写道:

> Hi,
>
> thanks you for the suggestion. The problem is that data_fine in my case
> can be null, so I wanted to first filter by $("fase_completata").isTrue()
> (when fase_completata is true data_fine is not null) and then define the
> watermark. Is it possible to define it after the last select?
>
> Best,
> Eugenio
>
> Il giorno 26 giu 2023, alle ore 05:31, feng xiangyu 
> ha scritto:
>
> Hi, Eugenio
>
> AFAIK, you could define watermark on the data_fine by adding attribute in 
> phasesDurationsSchema.
> For example:
>
> final Schema phasesDurationsSchema = Schema.newBuilder()
> .column("id_fascicolo", DataTypes.BIGINT().notNull())
> .column("nrg", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("fase", DataTypes.STRING().notNull())
> .column("fase_completata", DataTypes.BOOLEAN())
> .column("durata", DataTypes.BIGINT())
> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
> .watermark("data_fine", "data_fine - INTERVAL '1' SECOND")
> .primaryKey("id_fascicolo", "fase")
> .build();
>
> See more in Table API and DataStream API integration docs[1].
>
> Best,
> Xiangyu
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/
>
> Eugenio Marotti  于2023年6月25日周日 15:35写道:
>
>> Hi everyone,
>>
>> I'm using Flink for processing some streaming data. First of all I have
>> two tables receiving events from Kafka. These tables are joined and the
>> resulting table is converted to a DataStream where it is processed by a
>> custom KeyedProcessFunction. The output is then converted to a table and
>> sent to Opensearch. Here’s the code I’m using:
>>
>>
>> final TableDescriptor legalFilesTableDescriptor = TableDescriptor.
>> forConnector("kafka")
>> .schema(Schema.newBuilder()
>> .column("id", DataTypes.BIGINT())
>> .column("nrg", DataTypes.STRING())
>> .column("ufficio", DataTypes.STRING())
>> .column("sezione", DataTypes.STRING())
>> .column("giudice", DataTypes.STRING())
>> .column("oggetto", DataTypes.STRING())
>> .column("codice_oggetto", DataTypes.STRING())
>> .build())
>> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
>> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
>> ScanStartupMode.LATEST_OFFSET)
>> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
>> .build();
>> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
>> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);
>>
>>
>> final TableDescriptor eventsTableDescriptor = TableDescriptor.
>> forConnector("kafka")
>> .schema(Schema.newBuilder()
>> .column("id", DataTypes.BIGINT())
>> .column("data", DataTypes.BIGINT())
>> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
>> .column("evento", DataTypes.STRING())
>> .column("id_fascicolo", DataTypes.BIGINT())
>> .build())
>> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
>> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
>> ScanStartupMode.LATEST_OFFSET)
>> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
>> .build();
>> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);
>>
>>
>> Table legalFileEventsTable = legalFilesTable.join(e

Re: Flink Table API watermark after a select operation on a table

2023-06-25 Thread feng xiangyu
Hi, Eugenio

AFAIK, you could define watermark on the data_fine by adding attribute
in phasesDurationsSchema.
For example:

final Schema phasesDurationsSchema = Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING().notNull())
.column("fase_completata", DataTypes.BOOLEAN())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.watermark("data_fine", "data_fine - INTERVAL '1' SECOND")
.primaryKey("id_fascicolo", "fase")
.build();

See more in Table API and DataStream API integration docs[1].

Best,
Xiangyu

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/

Eugenio Marotti  于2023年6月25日周日 15:35写道:

> Hi everyone,
>
> I'm using Flink for processing some streaming data. First of all I have
> two tables receiving events from Kafka. These tables are joined and the
> resulting table is converted to a DataStream where it is processed by a
> custom KeyedProcessFunction. The output is then converted to a table and
> sent to Opensearch. Here’s the code I’m using:
>
>
> final TableDescriptor legalFilesTableDescriptor = TableDescriptor.
> forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.BIGINT())
> .column("nrg", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .build())
> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
> ScanStartupMode.LATEST_OFFSET)
> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
> .build();
> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);
>
>
> final TableDescriptor eventsTableDescriptor = TableDescriptor.forConnector
> ("kafka")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.BIGINT())
> .column("data", DataTypes.BIGINT())
> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
> .column("evento", DataTypes.STRING())
> .column("id_fascicolo", DataTypes.BIGINT())
> .build())
> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
> ScanStartupMode.LATEST_OFFSET)
> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
> .build();
> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);
>
>
> Table legalFileEventsTable = legalFilesTable.join(eventsTable)
> .where($("id").isEqual($("id_fascicolo")))
> .select(
> $("id").as("id_fascicolo"),
> $("id_evento"),
> $("giudice"),
> $("nrg"),
> $("codice_oggetto"),
> $("oggetto"),
> $("ufficio"),
> $("sezione"),
> $("data_evento"),
> $("evento")
> );
>
>
> DataStream phasesDurationsDataStream = tEnv.toChangelogStream(
> legalFileEventsTable)
> .keyBy(r -> r.getFieldAs("id_fascicolo"))
> .process(new PhaseDurationCounterProcessFunction())
> .returns(new RowTypeInfo(
> new TypeInformation[] {
> BasicTypeInfo.LONG_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO,
> BasicTypeInfo.LONG_TYPE_INFO,
> BasicTypeInfo.INSTANT_TYPE_INFO,
> BasicTypeInfo.INSTANT_TYPE_INFO
> },
> new String[] { "id_fascicolo", "nrg", "giudice", "oggetto",
> "codice_oggetto",
> "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio",
> "data_fine" }
> ));
>
> final Schema phasesDurationsSchema = Schema.newBuilder()
> .column("id_fascicolo", DataTypes.BIGINT().notNull())
> .column("nrg", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("fase", DataTypes.STRING().notNull())
> .column("fase_completata", DataTypes.BOOLEAN())
> .column("durata", DataTypes.BIGINT())
> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
> .primaryKey("id_fascicolo", "fase")
> .build();
>
> Table phasesDurationsTable = tEnv.fromChangelogStream(
> phasesDurationsDataStream, phasesDurationsSchema,
> ChangelogMode.upsert());
>
> final TableDescriptor phasesDurat

Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread feng xiangyu
Congrats Yu! Looking forward to contributing to Paimon!

Best Regards,
Xiangyu

yuxia  于2023年3月27日周一 21:01写道:

> congratulations!
>
> Best regards,
> Yuxia
>
>
> 发件人: "Andrew Otto" 
> 收件人: "Matthias Pohl" 
> 抄送: "Jing Ge" , "Leonard Xu" , "Yu
> Li" , "dev" , "User" <
> user@flink.apache.org>, "user-zh" 
> 发送时间: 星期一, 2023年 3 月 27日 下午 8:57:50
> 主题: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache
> Paimon(incubating)
>
> Exciting!
>
> If this ends up working well, Wikimedia Foundation would love to try it
> out!
>
> On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user < [ mailto:
> user@flink.apache.org | user@flink.apache.org ] > wrote:
>
>
>
> Congratulations and good luck with pushing the project forward.
>
> On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user < [ mailto:
> user@flink.apache.org | user@flink.apache.org ] > wrote:
>
> BQ_BEGIN
>
> Congrats!
> Best regards,
> Jing
>
> On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu < [ mailto:xbjt...@gmail.com |
> xbjt...@gmail.com ] > wrote:
>
> BQ_BEGIN
>
> Congratulations!
>
> Best,
> Leonard
>
>
> BQ_BEGIN
>
> On Mar 27, 2023, at 5:23 PM, Yu Li < [ mailto:car...@gmail.com |
> car...@gmail.com ] > wrote:
>
> Dear Flinkers,
>
>
>
>
> As you may have noticed, we are pleased to announce that Flink Table Store
> has joined the Apache Incubator as a separate project called Apache
> Paimon(incubating) [1] [2] [3]. The new project still aims at building a
> streaming data lake platform for high-speed data ingestion, change data
> tracking and efficient real-time analytics, with the vision of supporting a
> larger ecosystem and establishing a vibrant and neutral open source
> community.
>
>
>
>
> We would like to thank everyone for their great support and efforts for
> the Flink Table Store project, and warmly welcome everyone to join the
> development and activities of the new project. Apache Flink will continue
> to be one of the first-class citizens supported by Paimon, and we believe
> that the Flink and Paimon communities will maintain close cooperation.
>
>
>
>
> 亲爱的Flinkers,
>
>
>
>
> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2]
> [3]。新项目的名字是 Apache
> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>
>
>
>
>
> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink
> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>
>
>
>
> Best Regards,
> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>
> 致礼,
> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>
>
>
>
> [1] [ https://paimon.apache.org/ | https://paimon.apache.org/ ]
> [2] [ https://github.com/apache/incubator-paimon |
> https://github.com/apache/incubator-paimon ]
> [3] [ https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
> | https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal ]
>
>
>
>
>
> BQ_END
>
>
> BQ_END
>
>
> BQ_END
>
>
>