Ciao Eugenio,

I might be mistaken, but did you specify the event time for the second table 
like you did for the first table (watermark(….))?
I am no so acquainted with table api (doing more straight data stream api 
work), but I assume this join and windowing should be by event time.

What do you think?

Cari saluti

Thias


From: Eugenio Marotti <ing.eugenio.maro...@gmail.com>
Sent: Thursday, September 21, 2023 8:56 AM
To: user@flink.apache.org
Subject: Window aggregation on two joined table

Hi,

I’m trying to execute a window aggregation on two joined table from two Kafka 
topics (upsert fashion), but I get no output. Here’s the code I’m using:

This is the first table from Kafka with an event time watermark on ‘data_fine’ 
attribute:


final TableDescriptor phasesDurationsTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
       .schema(Schema.newBuilder()
             .column("id_fascicolo", DataTypes.BIGINT().notNull())
             .column("nrg", DataTypes.STRING())
             .column("giudice", DataTypes.STRING())
             .column("oggetto", DataTypes.STRING())
             .column("codice_oggetto", DataTypes.STRING())
             .column("ufficio", DataTypes.STRING())
             .column("sezione", DataTypes.STRING())
             .column("fase_completata", DataTypes.BOOLEAN())
             .column("fase", DataTypes.STRING().notNull())
             .column("durata", DataTypes.BIGINT())
             .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
             .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
             .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
             .primaryKey("id_fascicolo", "fase")
             .build())
       .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.phases-durations"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
       .build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);

Here’s the second table:

final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
       .schema(Schema.newBuilder()
             .column("giudice", DataTypes.STRING().notNull())
             .column("fase", DataTypes.STRING().notNull())
             .column("media_mobile", DataTypes.BIGINT())
             .primaryKey("giudice", "fase")
             .build())
       .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.average-judge-by-phase-report"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
       .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
"average-judge-by-phase-report")
       .build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable = 
tEnv.from("AverageJudgeByPhaseReport_Kafka");

Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
       .select(
             $("giudice").as("giudice_media"),
             $("fase").as("fase_media"),
             $("media_mobile")
       );



And here’s the code I’m experimenting with:

phasesDurationsTable
       .join(renamedAverageJudgeByPhaseReportTable)
       .where($("giudice").isEqual($("giudice_media")))
       .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
       .groupBy(
             $("giudice"),
             $("w")
       )
       .select(
             $("giudice")
       )
       .execute().print();



Am I doing something wrong?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to