Hi Aneesha,

For the interval join operator will output the data with NULL when it
confirms that
there will no data coming before the watermark.
And there is an optimization for reducing state access, which may add more
to trigger the output of these data.
For your case, it's almost 30 + 30 = 60 min. Did you wait that long to
check the output?

Aneesha Kaushal <aneesha.kaus...@reflektion.com> 于2021年3月18日周四 下午11:29写道:

> Hi,
> I am doing a simple POC using Flink SQL and I am facing some issues with
> Interval Join.
> *Use Case*: I have two Kafka streams and using Flink SQL interval join I
> want to remove rows from* stream 1*(abandoned_user_visits) that are
> present in *stream 2*(orders) within some time interval.
> *Data:*
> 1) *Abandoned user visits.* Sample data:
> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
> [674378611, 1754171520], "ts": "2021-03-18 11:00:00.208"}
> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
> [1942367711], "ts": "2021-03-18 11:45:00.208"}
> {"key1": "123", "email": "ema...@example.com", "abandoned_pids":
> [1754171520], "ts": "2021-03-18 12:00:00.208"}
> {"key1": "234", "email": "ema...@example.com", "abandoned_pids":
> [1942367711], "ts": "2021-03-18 12:45:00.208"}
> 2) *User order stream*
> {"key1": "234", "email": "ema...@example.com", "pids": [1754171520],
> "ts": "2021-03-18 11:55:00.208"}
> {"key1": "123", "email": "ema...@example.com", "pids": [674378611,
> 1754171520], "ts": "2021-03-18 12:10:00.208"}
> When I try to push the above records to Kafka and select from the below
> VIEW. I get the result that is actually an *INNER* join(not OUTER join).
> I even tried posting just one record to stream(1) and no record to stream
> (2), expecting that that record should be emitted. But nothing was emitted.
> What was interesting is when I use the processing time instead of event
> time, I get the results as expected.
> *Tables and Views used: *
> CREATE TABLE abandoned_visits (
>         key1 STRING
>       , email STRING
>       , ts TIMESTAMP(3)
>       , abandoned_pids ARRAY<BIGINT>
>       , WATERMARK FOR ts AS ts
> )
> WITH (
>   'connector' = 'kafka',
>   'topic' = 'abandoned-visits',
>   'properties.bootstrap.servers' = '...',
>   'format' = 'json'
> );
> CREATE TABLE orders (
>         key1 STRING
>       , email STRING
>       , ts TIMESTAMP(3)
>       , pids ARRAY<BIGINT>
>       , WATERMARK FOR ts AS ts
> )
> WITH (
>     'connector' = 'kafka',
>     'topic'     = 'orders',
>     'properties.bootstrap.servers' = '...',
>     'format'    = 'json'
> );
> CREATE VIEW abandoned_visits_with_no_orders AS
>       av.key1
>     , av.email
>     , av.abandoned_pids
>     , FLOOR(av.ts TO MINUTE)    AS visit_timestamp
>     , FLOOR(o.ts TO MINUTE)     AS order_timestamp
>     , o.email                   AS order_email
>   FROM abandoned_visits av
>   FULL OUTER JOIN orders o
>   ON  av.key1 = o.key1
>   AND av.email = o.email
>   AND o.ts BETWEEN av.ts - INTERVAL '30' MINUTE AND av.ts + INTERVAL '30'
> --  WHERE
> --    o.email IS NULL                                  // Commented this
> out so as to get something in result
> ;
> *Result: *
> select * from abandoned_visits_with_no_orders;
> This gives a result the same as an inner join. It doesn't have rows with
> NULL order data.
> I would appreciate any help.
> Thanks,
> Aneesha


Benchao Li

Reply via email to