Turns out the issue was that the windows were not triggered for the joins.
For one of the tasks the watermark was advanced, so the fdFiles and the
tableFiles windows were triggered, but the global watermark was not
advanced (parallelism was 4). Since the global watermark was not advanced,
the join window did not fire.

Sorry about the false alarm.

Thanks,
Peter

Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2024. jan. 16.,
K, 13:27):

> Hi Team,
>
> I am working on Iceberg in process compaction, and trying to use SQL
> window join to compare 2 streams like this:
>
>
>
>
> *Table fsFiles = tEnv.sqlQuery(    "SELECT runId, location, window_start,
> window_end " +    "FROM TABLE(" +    "*
>
> *TUMBLE(" +    "        TABLE " + fileSystemFilesTable + "," +     "
>   DESCRIPTOR(ts), " +*
>
> *    "*        *INTERVAL '1' SECONDS))");*
>
>
>
> *Table tableFiles = tEnv.sqlQuery(*
>
> *    "SELECT runId, location, window_start, window_end " +*
>
> *    "FROM TABLE(" +*
>
> *    "*    *TUMBLE(" +*
>
> *    "        TABLE " + **tableFilesTable** + "," +*
>
> *    "        DESCRIPTOR(ts), " +**    "*        *INTERVAL '1'
> SECONDS))");*
>
>
> Then I print out these streams with the following code, I see the values
> in the logs:
>
> *tEnv.toDataStream(fsFiles).print("FS");*
> *tEnv.toDataStream(tableFiles).print("TS");*
>
>
> The result is:
>
> *FS:2> +I[1705405510802,
> file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/00000-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json,
> 2024-01-16T11:45:10, 2024-01-16T11:45:11]*
> *[..]*
> *TS:2> +I[1705405510802,
> file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro,
> 2024-01-16T11:45:10, 2024-01-16T11:45:11]*
> *[..]*
>
>
> So this is as I expected the 2 streams periodically emit the incoming data
> files with the runIds, timestamps.
>
> Now, I try to run an ANTI JOIN on these streams:
>
> *Table missingFiles = tEnv.sqlQuery(*
> *    "           SELECT ts, location\n" +*
> *    "           FROM (\n" +*
> *    "               SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
> *    "           ) L WHERE L.location NOT IN (\n" +*
> *    "             SELECT location FROM (   \n" +*
> *    "               SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
> *    "             ) R WHERE L.window_start = R.window_start AND
> L.window_end = R.window_end)");*
>
>
> And event though there is some missing files based on the logs, I do not
> see any records in the logs for the missing table:
>
> *tEnv.toDataStream(missingFiles).print("MISSING");*
>
>
> Just for trying out a different way of solving/checking this, I tried to
> have a FULL JOIN to see how the join works:
>
> *Table joined = tEnv.sqlQuery(*
> *        "    SELECT fs_files.location AS fs_location,
> table_files.location AS table_location,\n" +*
> *        "      COALESCE(fs_files.window_start, table_files.window_start)
> as window_start,\n" +*
> *        "      COALESCE(fs_files.window_end, table_files.window_end) as
> window_end\n" +*
> *        "      FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +*
> *        "      LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles
> + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +*
> *        "      ON fs_files.location = table_files.location AND\n" +*
> *        "         fs_files.window_start = table_files.window_start AND
> \n" +*
> *        "         fs_files.window_end = table_files.window_end\n");*
>
>
> And there is nothing in the logs for the join either.
>
> I think I might miss something around the windowing, and my joined windows
> are not triggered with the complex queries, but I am stuck at the moment,
> so any help would be appreciated.
>
> Thanks,
> Peter
>

Reply via email to