Re: Time Temporal Join

2021-03-25 Thread Satyam Shekhar
Hi Timo,

Apologies for the late response. I somehow seem to have missed your reply.

I do want the join to be "time-based" since I need to perform a tumble
grouping operation on top of the join.

I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS,
that didn't help either.

Note that we have a custom connector to an internal storage engine. The
connector implements ScanTableSource interface with
SupportsWatermarkPushDown ability. Would the watermark strategy in the
table schema matter in that case? I changed the query to the following to
simplify further -

select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF
F.R ON F.C1 = D.C1

I still do not see any output from the pipeline. The overall logs I see
from the connecter is the following -

Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0  -->
ctx.collectWithTimestamp(row_,
rowtime);
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0
Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000
Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0
Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000
Emit D.F wm=4000  --->  ctx.emitWatermark(new Watermark(wm));
Emit D.D wm=0

Now, if I change the rowtime of table D to 1s instead of 0, I get one row
as output.

Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000
Emit D.F wm=1000
Emit D.D wm=1000

reply: (1, "1", 1000, 1, "1", 1000)

The next row streamed from F which should join with a row emitted from D
does not emit any output -

Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F wm=2000
NO REPLY

My understanding of temporal joins is that the latest row from D should be
picked for joining rows from F.  Is my expectation that the (2, 2, 2s) in F
join with (2, 2, 1s) row in D wrong?

Regards,
Satyam


On Tue, Mar 16, 2021 at 5:54 AM Timo Walther  wrote:

> Hi Satyam,
>
> first of all your initial join query can also work, you just need to
> make sure that no time attribute is in the SELECT clause. As the
> exception indicates, you need to cast all time attributes to TIMESTAMP.
> The reason for this is some major design issue that is also explained
> here where a time attribute must not be in the output of a regular join:
>
> https://stackoverflow.com/a/64500296/806430
>
> However, since you would like to perform the join "time-based" either
> interval join or temporal join might solve your use cases.
>
> In your case I guess the watermark strategy of D is the problem. Are you
> sure the result is:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>
> and not:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>
> Or maybe the watermark is even dropped. Could you try to use a watermark
> strategy with
>
> `R` - INTERVAL '0.001' SECONDS
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 16.03.21 04:37, Satyam Shekhar wrote:
> > Hello folks,
> >
> > I would love to hear back your feedback on this.
> >
> > Regards,
> > Satyam
> >
> > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar  > > wrote:
> >
> > Hello folks,
> >
> > I am looking to enrich rows from an unbounded streaming table by
> > joining it with a bounded static table while preserving rowtime for
> > the streaming table. For example, let's consider table two tables F
> > and D, where F is unbounded and D is bounded. The schema for the two
> > tables is the following -
> >
> > F:
> >   |-- C0: BIGINT
> >   |-- C1: STRING
> >   |-- R: TIMESTAMP(3) **rowtime**
> >   |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> > D:
> >   |-- C0: BIGINT
> >   |-- C1: STRING NOT NULL
> >
> > I'd like to run the following query on this schema -
> >
> > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >  from F join D ON F.C1 = D.C1
> >  group by D.C1, tumble(F.R, interval '1' second)
> >
> > However, I run into the following error while running the above
> query -
> >
> > "Rowtime attributes must not be in the input rows of a regular join.
> > As a workaround you can cast the time attributes of input tables to
> > TIMESTAMP before."
> >
> > My understanding reading the docs is that Time Temporal Join is
> > meant to solve this problem. So I model table D as the following 

Re: Time Temporal Join

2021-03-16 Thread Timo Walther

Hi Satyam,

first of all your initial join query can also work, you just need to 
make sure that no time attribute is in the SELECT clause. As the 
exception indicates, you need to cast all time attributes to TIMESTAMP. 
The reason for this is some major design issue that is also explained 
here where a time attribute must not be in the output of a regular join:


https://stackoverflow.com/a/64500296/806430

However, since you would like to perform the join "time-based" either 
interval join or temporal join might solve your use cases.


In your case I guess the watermark strategy of D is the problem. Are you 
sure the result is:


> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0

and not:

> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D watermark=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0

Or maybe the watermark is even dropped. Could you try to use a watermark 
strategy with


`R` - INTERVAL '0.001' SECONDS

I hope this helps.

Regards,
Timo



On 16.03.21 04:37, Satyam Shekhar wrote:

Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar > wrote:


Hello folks,

I am looking to enrich rows from an unbounded streaming table by
joining it with a bounded static table while preserving rowtime for
the streaming table. For example, let's consider table two tables F
and D, where F is unbounded and D is bounded. The schema for the two
tables is the following -

F:
  |-- C0: BIGINT
  |-- C1: STRING
  |-- R: TIMESTAMP(3) **rowtime**
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL

I'd like to run the following query on this schema -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

However, I run into the following error while running the above query -

"Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to
TIMESTAMP before."

My understanding reading the docs is that Time Temporal Join is
meant to solve this problem. So I model table D as the following -

D:
  |-- C0: BIGINT
  |-- C1: STRING NOT NULL
  |-- R: TIMESTAMP(3)
  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)

With column D.R always set to 0 and modify the query as follows -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
     from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
     group by D.C1, tumble(F.R, interval '1' second)

The above query runs but does not return any result. I have the
following data in D initially -
Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
Emit D watermark=0

And F streams the following rows -
Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
Emit F watermark=1000

I expect that two rows in F will join with matching rows (on C1) in
D and produce some output. But I do not see anything in the output.

So I have the following questions -

1. Is time temporal join the correct tool to solve this problem?
2. What could be the reason for not getting any output rows in the
result?

Thanks,
Satyam





Re: Time Temporal Join

2021-03-15 Thread Satyam Shekhar
Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar 
wrote:

> Hello folks,
>
> I am looking to enrich rows from an unbounded streaming table by
> joining it with a bounded static table while preserving rowtime for the
> streaming table. For example, let's consider table two tables F and D,
> where F is unbounded and D is bounded. The schema for the two tables is the
> following -
>
> F:
>  |-- C0: BIGINT
>  |-- C1: STRING
>  |-- R: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>
> I'd like to run the following query on this schema -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> However, I run into the following error while running the above query -
>
> "Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before."
>
> My understanding reading the docs is that Time Temporal Join is meant to
> solve this problem. So I model table D as the following -
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>  |-- R: TIMESTAMP(3)
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)
>
> With column D.R always set to 0 and modify the query as follows -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> The above query runs but does not return any result. I have the following
> data in D initially -
> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0
>
> And F streams the following rows -
> Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
> Emit F watermark=1000
>
> I expect that two rows in F will join with matching rows (on C1) in D and
> produce some output. But I do not see anything in the output.
>
> So I have the following questions -
>
> 1. Is time temporal join the correct tool to solve this problem?
> 2. What could be the reason for not getting any output rows in the result?
>
> Thanks,
> Satyam
>
>