Re: Flink 1.10 es sink exception

2020-02-16 Thread Leonard Xu
Hi, sunfulin

Using constant key in `group by` query is not usual and inefficient, you  can 
get around this bug by bubbling up your constant key in `group by` from now.  

BTW,godfrey is ready to resolve issue. 


> 在 2020年2月17日,10:15,sunfulin  写道:
> 
> Hi,
> WOW,really thankful for the track and debug of this problem. I can see the 
> constant key issue. Appreciate for your kindly help : )
> 
> 
> 
> 
> 
> At 2020-02-15 21:06:58, "Leonard Xu"  wrote:
> 
> 
> Hi, sunfulin
> I reproduce your case,this should be a bug in extracting unique key from plan 
> and I create an issue[1] to trace this.
> 
> 
> CC:  jark
> 
> 
> [1]https://issues.apache.org/jira/browse/FLINK-16070 
> 
> 
> 
> 
>> 在 2020年2月14日,23:39,sunfulin > > 写道:
>> 
>> Hi, Jark
>> Appreciate for your reply. insert with column list indeed is not allowed 
>> with old planner enabled in Flink 1.10 while it will throws exception such 
>> as "Partial insert is not supported". 
>> Never mind for this issue. Focus on the UpsertMode exception, my es DDL is 
>> like the following: 
>> 
>> CREATE TABLE ES6_ZHANGLE_OUTPUT (
>>   aggId varchar ,
>>   pageId varchar ,
>>   ts varchar ,
>>   expoCnt bigint ,
>>   clkCnt bigint
>> ) WITH (
>> 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',
>> 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093 
>> ',
>> 'connector.index' = 'flink_zhangle_pageview',
>> 'connector.document-type' = '_doc',
>> 'update-mode' = 'upsert',
>> 'connector.key-delimiter' = '$',
>> 'connector.key-null-literal' = 'n/a',
>> 'connector.bulk-flush.interval' = '1000',
>> 'format.type' = 'json'
>> )
>> 
>> 
>> And the SQL logic is as the following:
>> 
>> INSERT INTO ES6_ZHANGLE_OUTPUT
>>   SELECT aggId, pageId, ts_min as ts,
>>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>>   count(case when eventId = 'click' then 1 else null end) as clkCnt
>>   FROM
>>   (
>> SELECT
>> 'ZL_001' as aggId,
>> pageId,
>> eventId,
>> recvTime,
>> ts2Date(recvTime) as ts_min
>> from kafka_zl_etrack_event_stream
>> where eventId in ('exposure', 'click')
>>   ) as t1
>>   group by aggId, pageId, ts_min
>> 
>> I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
>> execute the task. Not sure what the root cause is. 
>> 
>> 
>> At 2020-02-14 23:19:14, "Jark Wu" > > wrote:
>> 
>> Hi sunfulin,
>> 
>> Is this the real query you submit?  AFAIK, insert with column list is not 
>> allowed for now, 
>> i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, 
>> clkCnt)`.
>> 
>> Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT 
>> table and kafka_zl_etrack_event_stream table.
>> If you have a minimal program that can reproduce this problem, that would be 
>> great. 
>> 
>> Best,
>> Jark
>> 
>> On Fri, 14 Feb 2020 at 22:53, Robert Metzger > > wrote:
>> 
>> 
>> -- Forwarded message -
>> From: sunfulin mailto:sunfulin0...@163.com>>
>> Date: Fri, Feb 14, 2020 at 2:59 AM
>> Subject: Re:Flink 1.10 es sink exception
>> To: user@flink.apache.org  
>> mailto:user@flink.apache.org>>
>> 
>> 
>> Anyone can share a little advice on the reason of this exception? I changed 
>> to use old planner, the same sql runs well. 
>> 
>> 
>> 
>> 
>> 
>> At 2020-02-13 16:07:18, "sunfulin" > > wrote:
>> 
>> Hi, guys
>> When running the same Flink sql like the following, I met exception like 
>> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
>> that Table has a full primary keys if it is updated". I am using the latest 
>> Flink 1.10 release with blink planner enabled. Because the same logic runs 
>> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or 
>> may has a bug here ? 
>> 
>> 
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>   SELECT aggId, pageId, ts_min as ts,
>>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>>   FROM
>>   (
>> SELECT
>> 'ZL_001' as aggId,
>> pageId,
>> eventId,
>> recvTime,
>> ts2Date(recvTime) as ts_min
>> from kafka_zl_etrack_event_stream
>> where eventId in ('exposure', 'click')
>>   ) as t1
>>   group by aggId, pageId, ts_min
>> 
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  
> 
> 
> 
>  



Re:Re: Flink 1.10 es sink exception

2020-02-16 Thread sunfulin
Hi,
WOW,really thankful for the track and debug of this problem. I can see the 
constant key issue. Appreciate for your kindly help : )











At 2020-02-15 21:06:58, "Leonard Xu"  wrote:



Hi, sunfulin
I reproduce your case,this should be a bug in extracting unique key from plan 
and I create an issue[1] to trace this.




CC:  jark




[1]https://issues.apache.org/jira/browse/FLINK-16070






在 2020年2月14日,23:39,sunfulin  写道:


Hi, Jark
Appreciate for your reply. insert with column list indeed is not allowed with 
old planner enabled in Flink 1.10 while it will throws exception such as 
"Partial insert is not supported". 
Never mind for this issue. Focus on the UpsertMode exception, my es DDL is like 
the following: 


CREATE TABLE ES6_ZHANGLE_OUTPUT (
  aggId varchar ,
  pageId varchar ,
  ts varchar ,
  expoCnt bigint ,
  clkCnt bigint
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093',
'connector.index' = 'flink_zhangle_pageview',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)




And the SQL logic is as the following:


INSERT INTO ES6_ZHANGLE_OUTPUT
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clkCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min


I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
execute the task. Not sure what the root cause is. 





At 2020-02-14 23:19:14, "Jark Wu"  wrote:

Hi sunfulin,


Is this the real query you submit?  AFAIK, insert with column list is not 
allowed for now, 
i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.


Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT table 
and kafka_zl_etrack_event_stream table.
If you have a minimal program that can reproduce this problem, that would be 
great. 


Best,
Jark


On Fri, 14 Feb 2020 at 22:53, Robert Metzger  wrote:




-- Forwarded message -
From: sunfulin
Date: Fri, Feb 14, 2020 at 2:59 AM
Subject: Re:Flink 1.10 es sink exception
To: user@flink.apache.org 




Anyone can share a little advice on the reason of this exception? I changed to 
use old planner, the same sql runs well. 











At 2020-02-13 16:07:18, "sunfulin"  wrote:

Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min





 




 




 




Re: Flink 1.10 es sink exception

2020-02-15 Thread Leonard Xu

Hi, sunfulin
I reproduce your case,this should be a bug in extracting unique key from plan 
and I create an issue[1] to trace this.


CC:  jark


[1]https://issues.apache.org/jira/browse/FLINK-16070 




> 在 2020年2月14日,23:39,sunfulin  写道:
> 
> Hi, Jark
> Appreciate for your reply. insert with column list indeed is not allowed with 
> old planner enabled in Flink 1.10 while it will throws exception such as 
> "Partial insert is not supported". 
> Never mind for this issue. Focus on the UpsertMode exception, my es DDL is 
> like the following: 
> 
> CREATE TABLE ES6_ZHANGLE_OUTPUT (
>   aggId varchar ,
>   pageId varchar ,
>   ts varchar ,
>   expoCnt bigint ,
>   clkCnt bigint
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093',
> 'connector.index' = 'flink_zhangle_pageview',
> 'connector.document-type' = '_doc',
> 'update-mode' = 'upsert',
> 'connector.key-delimiter' = '$',
> 'connector.key-null-literal' = 'n/a',
> 'connector.bulk-flush.interval' = '1000',
> 'format.type' = 'json'
> )
> 
> 
> And the SQL logic is as the following:
> 
> INSERT INTO ES6_ZHANGLE_OUTPUT
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clkCnt
>   FROM
>   (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
> 
> I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
> execute the task. Not sure what the root cause is. 
> 
> 
> At 2020-02-14 23:19:14, "Jark Wu"  wrote:
> 
> Hi sunfulin,
> 
> Is this the real query you submit?  AFAIK, insert with column list is not 
> allowed for now, 
> i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.
> 
> Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT 
> table and kafka_zl_etrack_event_stream table.
> If you have a minimal program that can reproduce this problem, that would be 
> great. 
> 
> Best,
> Jark
> 
> On Fri, 14 Feb 2020 at 22:53, Robert Metzger  > wrote:
> 
> 
> -- Forwarded message -
> From: sunfulin mailto:sunfulin0...@163.com>>
> Date: Fri, Feb 14, 2020 at 2:59 AM
> Subject: Re:Flink 1.10 es sink exception
> To: user@flink.apache.org  
> mailto:user@flink.apache.org>>
> 
> 
> Anyone can share a little advice on the reason of this exception? I changed 
> to use old planner, the same sql runs well. 
> 
> 
> 
> 
> 
> At 2020-02-13 16:07:18, "sunfulin"  > wrote:
> 
> Hi, guys
> When running the same Flink sql like the following, I met exception like 
> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires 
> that Table has a full primary keys if it is updated". I am using the latest 
> Flink 1.10 release with blink planner enabled. Because the same logic runs 
> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or 
> may has a bug here ? 
> 
> 
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>   FROM
>   (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
> 
> 
> 
>  
> 
> 
>  
> 
> 
>  



Re:Re: Flink 1.10 es sink exception

2020-02-14 Thread sunfulin
Hi, Jark
Appreciate for your reply. insert with column list indeed is not allowed with 
old planner enabled in Flink 1.10 while it will throws exception such as 
"Partial insert is not supported". 
Never mind for this issue. Focus on the UpsertMode exception, my es DDL is like 
the following: 


CREATE TABLE ES6_ZHANGLE_OUTPUT (
  aggId varchar ,
  pageId varchar ,
  ts varchar ,
  expoCnt bigint ,
  clkCnt bigint
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://168.61.113.171:9092;http://168.61.113.171:9093',
'connector.index' = 'flink_zhangle_pageview',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)




And the SQL logic is as the following:


INSERT INTO ES6_ZHANGLE_OUTPUT
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clkCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min


I simply run StreamTableEnvironment.sqlUpdate( the above sql content) and 
execute the task. Not sure what the root cause is. 





At 2020-02-14 23:19:14, "Jark Wu"  wrote:

Hi sunfulin,


Is this the real query you submit?  AFAIK, insert with column list is not 
allowed for now, 
i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.


Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT table 
and kafka_zl_etrack_event_stream table.
If you have a minimal program that can reproduce this problem, that would be 
great. 


Best,
Jark


On Fri, 14 Feb 2020 at 22:53, Robert Metzger  wrote:




-- Forwarded message -
From: sunfulin
Date: Fri, Feb 14, 2020 at 2:59 AM
Subject: Re:Flink 1.10 es sink exception
To: user@flink.apache.org 




Anyone can share a little advice on the reason of this exception? I changed to 
use old planner, the same sql runs well. 











At 2020-02-13 16:07:18, "sunfulin"  wrote:

Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min






 





 

Re: Flink 1.10 es sink exception

2020-02-14 Thread Jark Wu
Hi sunfulin,

Is this the real query you submit?  AFAIK, insert with column list is not
allowed for now,
i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt,
clkCnt)`.

Could you attach the full SQL text, including  DDLs of ES6_ZHANGLE_OUTPUT
table and kafka_zl_etrack_event_stream table.
If you have a minimal program that can reproduce this problem, that would
be great.

Best,
Jark

On Fri, 14 Feb 2020 at 22:53, Robert Metzger  wrote:

>
>
> -- Forwarded message -
> From: sunfulin 
> Date: Fri, Feb 14, 2020 at 2:59 AM
> Subject: Re:Flink 1.10 es sink exception
> To: user@flink.apache.org 
>
>
> Anyone can share a little advice on the reason of this exception? I
> changed to use old planner, the same sql runs well.
>
>
>
>
>
> At 2020-02-13 16:07:18, "sunfulin"  wrote:
>
> Hi, guys
> When running the same Flink sql like the following, I met exception like
> "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
> that Table has a full primary keys if it is updated". I am using the latest
> Flink 1.10 release with blink planner enabled. Because the same logic runs
> well within Flink 1.8.2 old planner. Does the SQL usage has some problem or
> may has a bug here ?
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>   SELECT aggId, pageId, ts_min as ts,
>   count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
>   count(case when eventId = 'click' then 1 else null end) as clickCnt
>   FROM
>   (
> SELECT
> 'ZL_001' as aggId,
> pageId,
> eventId,
> recvTime,
> ts2Date(recvTime) as ts_min
> from kafka_zl_etrack_event_stream
> where eventId in ('exposure', 'click')
>   ) as t1
>   group by aggId, pageId, ts_min
>
>
>
>
>
>
>
>
>


Flink 1.10 es sink exception

2020-02-13 Thread sunfulin
Hi, guys
When running the same Flink sql like the following, I met exception like 
"org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that 
Table has a full primary keys if it is updated". I am using the latest Flink 
1.10 release with blink planner enabled. Because the same logic runs well 
within Flink 1.8.2 old planner. Does the SQL usage has some problem or may has 
a bug here ? 




INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
  SELECT aggId, pageId, ts_min as ts,
  count(case when eventId = 'exposure' then 1 else null end) as expoCnt,
  count(case when eventId = 'click' then 1 else null end) as clickCnt
  FROM
  (
SELECT
'ZL_001' as aggId,
pageId,
eventId,
recvTime,
ts2Date(recvTime) as ts_min
from kafka_zl_etrack_event_stream
where eventId in ('exposure', 'click')
  ) as t1
  group by aggId, pageId, ts_min