Re: Flink - sending clicks+impressions to AWS Personalize

2020-12-17 Thread Timo Walther

Hi Dan,

the exception that you get is a very frequent limitation in Flink SQL at 
the moment.


I tried to summarize the issue recently here:

https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296

The query is quite complex. It seems that some JOIN is not recognized as 
a streaming interval join. Maybe you can split up the big query into 
individual subqueries and verify the the plan using 
`TableEnvironment.explainSql()` to figure out which join causes the 
exception.


Regards,
Timo


On 16.12.20 03:40, Dan Hill wrote:
I want to try using AWS Personalize 
 to get content recommendations.  
One of the fields on the input (click) event is a list of recent 
impressions.


E.g.
{
   ...
   eventType: 'click',
   eventId: 'click-1',
   itemId: 'item-1'
   impression: ['item-2', 'item-3', 'item-4', 'item-5', ],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As 
a workaround you can cast the time attributes of input tables to 
TIMESTAMP before."


Here is a simplified version of the query.


SELECT

   "user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

   CAST(click.ts AS BIGINT) AS sentAt,

   insertion.content_id AS itemId,

   impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

   ON "user".log_user_id = "view".log_user_id

   AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts + 
INTERVAL '1' HOUR


JOIN insertion

   ON view.view_id = insertion.view_id

   AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND 
insertion.ts + INTERVAL '1' HOUR


JOIN impression  ON insertion.insertion_id = impression.insertion_id

   AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR


JOIN (

   SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS 
ARRAY) AS impression_content_ids


FROM (

   SELECT insertion.log_user_id AS log_user_id,

   ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY 
impression.ts DESC) AS row_num,


       insertion.content_id AS impression_content_id

     FROM insertion

     JOIN impression

     ON insertion.insertion_id = impression.insertion_id

       AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR


     GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND 
impression.ts + INTERVAL '1' HOUR  LEFT JOIN click


ON impression.impression_id = click.impression_id

     AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND 
click.ts + INTERVAL '12' HOUR"






Flink - sending clicks+impressions to AWS Personalize

2020-12-15 Thread Dan Hill
I want to try using AWS Personalize 
to get content recommendations.  One of the fields on the input (click)
event is a list of recent impressions.

E.g.
{
  ...
  eventType: 'click',
  eventId: 'click-1',
  itemId: 'item-1'
  impression: ['item-2', 'item-3', 'item-4', 'item-5', ],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before."

Here is a simplified version of the query.


SELECT

"user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

CAST(click.ts AS BIGINT) AS sentAt,

insertion.content_id AS itemId,

impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

ON "user".log_user_id = "view".log_user_id

AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts +
INTERVAL '1' HOUR

JOIN insertion

ON view.view_id = insertion.view_id

AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND insertion.ts
+ INTERVAL '1' HOUR

JOIN impression  ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

JOIN (

SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS
ARRAY) AS impression_content_ids

FROM (

SELECT insertion.log_user_id AS log_user_id,

ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY
impression.ts DESC) AS row_num,

  insertion.content_id AS impression_content_id

FROM insertion

JOIN impression

ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR  LEFT JOIN click

ON impression.impression_id = click.impression_id

AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts +
INTERVAL '12' HOUR"