Thank you. I’ll try to configure a watermark on the second table. Eugenio
> Il giorno 21 set 2023, alle ore 16:30, Schwalbe Matthias > <matthias.schwa...@viseca.ch> ha scritto: > > … well yes and no: > If the second table is a small table used for enrichment, you can also mark > it as broadcast table, but I don’t know how to do that on table API > If the second table has significant data and significant update, the you need > to configure watermarking/event time semantics on the second table as well > The logic is this: > Your join operator only generates output windows once the event time passes > by the end of the time window > The event time/watermark time of you join operator is the minimum watermark > time of all inputs > Because your second table does not emit watermark, it’s watermark time > remains at Long.MinValue, hence also the operator time stays there > Another way to make progress is, in case your second table does not update > watermarks/data often enough, to mark the source with an idle watermark > generator in which case it is rendered as ‘timeless’ and does not prevent > time progress in your join operator > Again, not sure how to configure this > > Ancora cari saluti > > Thias > > > > > > From: Eugenio Marotti <ing.eugenio.maro...@gmail.com> > Sent: Thursday, September 21, 2023 2:35 PM > To: Schwalbe Matthias <matthias.schwa...@viseca.ch> > Cc: user@flink.apache.org > Subject: Re: Window aggregation on two joined table > > Hi Matthias, > > No the second table doesn’t have an event time and a watermark specified. In > order for the window to work do I need a watermark also on the second table? > > Thanks > Eugenio > > > Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias > <matthias.schwa...@viseca.ch <mailto:matthias.schwa...@viseca.ch>> ha scritto: > > Ciao Eugenio, > > I might be mistaken, but did you specify the event time for the second table > like you did for the first table (watermark(….))? > I am no so acquainted with table api (doing more straight data stream api > work), but I assume this join and windowing should be by event time. > > What do you think? > > Cari saluti > > Thias > > > From: Eugenio Marotti <ing.eugenio.maro...@gmail.com > <mailto:ing.eugenio.maro...@gmail.com>> > Sent: Thursday, September 21, 2023 8:56 AM > To: user@flink.apache.org <mailto:user@flink.apache.org> > Subject: Window aggregation on two joined table > > Hi, > > I’m trying to execute a window aggregation on two joined table from two Kafka > topics (upsert fashion), but I get no output. Here’s the code I’m using: > > This is the first table from Kafka with an event time watermark on > ‘data_fine’ attribute: > > final TableDescriptor phasesDurationsTableDescriptor = > TableDescriptor.forConnector("upsert-kafka") > .schema(Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("fase", DataTypes.STRING().notNull()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND") > .primaryKey("id_fascicolo", "fase") > .build()) > .option(KafkaConnectorOptions.TOPIC, > List.of("sicid.processor.phases-durations")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.KEY_FORMAT, "json") > .option(KafkaConnectorOptions.VALUE_FORMAT, "json") > .build(); > tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor); > Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”); > Here’s the second table: > final TableDescriptor averageJudgeByPhaseReportTableDescriptor = > TableDescriptor.forConnector("upsert-kafka") > .schema(Schema.newBuilder() > .column("giudice", DataTypes.STRING().notNull()) > .column("fase", DataTypes.STRING().notNull()) > .column("media_mobile", DataTypes.BIGINT()) > .primaryKey("giudice", "fase") > .build()) > .option(KafkaConnectorOptions.TOPIC, > List.of("sicid.processor.average-judge-by-phase-report")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.KEY_FORMAT, "json") > .option(KafkaConnectorOptions.VALUE_FORMAT, "json") > .option(KafkaConnectorOptions.PROPS_GROUP_ID, > "average-judge-by-phase-report") > .build(); > tEnv.createTable("AverageJudgeByPhaseReport_Kafka", > averageJudgeByPhaseReportTableDescriptor); > Table averageJudgeByPhaseReportTable = > tEnv.from("AverageJudgeByPhaseReport_Kafka"); > > Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable > .select( > $("giudice").as("giudice_media"), > $("fase").as("fase_media"), > $("media_mobile") > ); > > And here’s the code I’m experimenting with: > phasesDurationsTable > .join(renamedAverageJudgeByPhaseReportTable) > .where($("giudice").isEqual($("giudice_media"))) > .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w")) > .groupBy( > $("giudice"), > $("w") > ) > .select( > $("giudice") > ) > .execute().print(); > > Am I doing something wrong? > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit > von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine > Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser > Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung > per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. > Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist > streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have received > it in error, please advise the sender by return e-mail and delete this > message and any attachments. Any unauthorised use or dissemination of this > information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit > von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine > Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser > Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung > per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. > Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist > streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have received > it in error, please advise the sender by return e-mail and delete this > message and any attachments. Any unauthorised use or dissemination of this > information is strictly prohibited.