… well yes and no:
* If the second table is a small table used for enrichment, you can also
mark it as broadcast table, but I don’t know how to do that on table API
* If the second table has significant data and significant update, the you
need to configure watermarking/event time semantics on the second table as well
* The logic is this:
* Your join operator only generates output windows once the event time
passes by the end of the time window
* The event time/watermark time of you join operator is the minimum
watermark time of all inputs
* Because your second table does not emit watermark, it’s watermark time
remains at Long.MinValue, hence also the operator time stays there
* Another way to make progress is, in case your second table does not
update watermarks/data often enough, to mark the source with an idle watermark
generator in which case it is rendered as ‘timeless’ and does not prevent time
progress in your join operator
* Again, not sure how to configure this
Ancora cari saluti
Thias
From: Eugenio Marotti <[email protected]>
Sent: Thursday, September 21, 2023 2:35 PM
To: Schwalbe Matthias <[email protected]>
Cc: [email protected]
Subject: Re: Window aggregation on two joined table
Hi Matthias,
No the second table doesn’t have an event time and a watermark specified. In
order for the window to work do I need a watermark also on the second table?
Thanks
Eugenio
Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias
<[email protected]<mailto:[email protected]>> ha scritto:
Ciao Eugenio,
I might be mistaken, but did you specify the event time for the second table
like you did for the first table (watermark(….))?
I am no so acquainted with table api (doing more straight data stream api
work), but I assume this join and windowing should be by event time.
What do you think?
Cari saluti
Thias
From: Eugenio Marotti
<[email protected]<mailto:[email protected]>>
Sent: Thursday, September 21, 2023 8:56 AM
To: [email protected]<mailto:[email protected]>
Subject: Window aggregation on two joined table
Hi,
I’m trying to execute a window aggregation on two joined table from two Kafka
topics (upsert fashion), but I get no output. Here’s the code I’m using:
This is the first table from Kafka with an event time watermark on ‘data_fine’
attribute:
final TableDescriptor phasesDurationsTableDescriptor =
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase_completata", DataTypes.BOOLEAN())
.column("fase", DataTypes.STRING().notNull())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
.primaryKey("id_fascicolo", "fase")
.build())
.option(KafkaConnectorOptions.TOPIC,
List.of("sicid.processor.phases-durations"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
.build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
Here’s the second table:
final TableDescriptor averageJudgeByPhaseReportTableDescriptor =
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("giudice", DataTypes.STRING().notNull())
.column("fase", DataTypes.STRING().notNull())
.column("media_mobile", DataTypes.BIGINT())
.primaryKey("giudice", "fase")
.build())
.option(KafkaConnectorOptions.TOPIC,
List.of("sicid.processor.average-judge-by-phase-report"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
.option(KafkaConnectorOptions.PROPS_GROUP_ID,
"average-judge-by-phase-report")
.build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka",
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable =
tEnv.from("AverageJudgeByPhaseReport_Kafka");
Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
.select(
$("giudice").as("giudice_media"),
$("fase").as("fase_media"),
$("media_mobile")
);
And here’s the code I’m experimenting with:
phasesDurationsTable
.join(renamedAverageJudgeByPhaseReportTable)
.where($("giudice").isEqual($("giudice_media")))
.window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
.groupBy(
$("giudice"),
$("w")
)
.select(
$("giudice")
)
.execute().print();
Am I doing something wrong?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.