Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread
+1 for making the blink planner as default planner, the blink planner
becomes more stable since 1.10

Dian Fu  于2020年1月6日周一 上午11:51写道:

> +1 to set blink planner as the default planner for SQL client considering
> that so many features added since 1.10 are only available in the blink
> planner.
>
> 在 2020年1月6日,上午11:04,Rui Li  写道:
>
> +1. I think it improves user experience.
>
> On Mon, Jan 6, 2020 at 10:18 AM Zhenghua Gao  wrote:
>
>> +1 for making blink planner as the default planner for SQL Client since
>> we have made a huge improvement in 1.10.
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  wrote:
>>
>>> +1
>>>
>>> We have used blink planner since 1.9.0 release in our production
>>> environment, and it behaves really impressive.
>>>
>>> Hequn Cheng  于2020年1月5日周日 下午1:58写道:
>>>
 +1 to make blink planner as the default planner for SQL Client, hence
 we can give the blink planner a bit more exposure.

 Best, Hequn

 On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:

> Hi Benoît,
>
> Thanks for the reminder. I will look into the issue and hopefully we
> can target it into 1.9.2 and 1.10.
>
> Cheers,
> Jark
>
> On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> >  If anyone finds that blink planner has any significant defects
>> and has a larger regression than the old planner, please let us know.
>>
>> Overall, the Blink-exclusive features are must (TopN, deduplicate,
>> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
>> production are not covered:
>> An edge case of Temporal Table Functions does not allow computed
>> Tables (as opposed to TableSources) to be used on the query side in 
>> Blink (
>> https://issues.apache.org/jira/browse/FLINK-14200)
>>
>> Cheers
>> Ben
>>
>>
>> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>>
>>> +1, I have already made blink as the default planner of flink
>>> interpreter in Zeppelin
>>>
>>>
>>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>>
 Hi Jark,

 +1 for default blink planner in SQL-CLI.
 I believe this new planner can be put into practice in production.
 We've worked hard for nearly a year, but the old planner didn't
 move on.

 And I'd like to cc to user@flink.apache.org.
 If anyone finds that blink planner has any significant defects and
 has a larger regression than the old planner, please let us know. We 
 will
 be very grateful.

 Best,
 Jingsong Lee

 On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu 
 wrote:

> +1 for this.
> We bring many SQL/API features and enhance stability in 1.10
> release, and almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many
> users will have a better experience If we set Blink planner as default
> planner.
>
> Best,
> Leonard
>
> > 在 2020年1月3日,15:16,Terry Wang  写道:
> >
> > Since what blink planner can do is a superset of flink planner,
> big +1 for changing the default planner to Blink planner from my side.
> >
> > Best,
> > Terry Wang
> >
> >
> >
> >> 2020年1月3日 15:00,Jark Wu  写道:
> >>
> >> Hi everyone,
> >>
> >> In 1.10 release, Flink SQL supports many awesome features and
> improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >>
> >> However, all the features and improvements are only avaiable in
> Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink
> planner, e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization
> [4], and so on.
> >>
> >> But Old planner is still the default planner in Table API &
> SQL. It is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And
> it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch
> planner.
> >>
> >> SQL CLI is a very important entrypoint for trying out new
> feautures and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to
> suggest to set
> >> de

Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin  于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
> SELECT
>
>  aggId,
>
>  pageId,
>
>  statkey,
>
>  COUNT(DISTINCT deviceId) as cnt
>
>  FROM
>
>  (
>
>  SELECT
>
>  'ZL_005' as aggId,
>
>  'ZL_UV_PER_MINUTE' as pageId,
>
>  deviceId,
>
>  ts2Date(recvTime) as statkey
>
>  from
>
>  kafka_zl_etrack_event_stream
>
>  )
>
>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best


回复:Compound Time interval in SQL queries

2019-11-21 Thread (晓令)
please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey


--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit
 

回复:Compound Time interval in SQL queries

2019-11-21 Thread (晓令)
hi arujit,
Which Flink version are you using?


thanks, 
godfrey



--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:21
收件人:贺小令(晓令) ; user 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

Thanks for your reply. But now I am getting this error :

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window descriptors are 
supported.
at 
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)
Caused by: org.apache.flink.table.api.TableException: Only constant window 
descriptors are supported.
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令)  wrote:

please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey

--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit
  

回复:Compound Time interval in SQL queries

2019-11-21 Thread (晓令)
hi arujit,  

blink planner with flink-1.9 supports this query.

the reason is both planners do not support complex expressions like INTERVAL 
'7' HOUR + INTERVAL '30' MINUTE when transforming window to 
LogicalWindowAggregate node now.

why blink planner supports this query?
 the optimization order between two planners are different,  Flink planner 
(a.k.a. old planner) will transform window to LogicalWindowAggregate node 
first, and then simplify the constant expressions (like INTERVAL '7' HOUR + 
INTERVAL '30' MINUTE, which could be simplified to 2700:INTERVAL HOUR TO 
MINUTE). While blink planner's approach is just the opposite. (simplify 
expression first, and then transform window). 

so, you could try blink planner.

thanks, 
godfrey




--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:31
收件人:贺小令(晓令) 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

We are using flink-1.6.2. But when working with flink-1.9 I am still getting 
this error.

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window intervals with 
millisecond resolution are supported.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:53 PM 贺小令(晓令)  wrote:

hi arujit,
Which Flink version are you using?


thanks, 
godfrey


--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 17:21
收件人:贺小令(晓令) ; user 
主 题:Re: Compound Time interval in SQL queries

Hi, godfrey,

Thanks for your reply. But now I am getting this error :

Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: 
org.apache.flink.table.api.TableException: Only constant window descriptors are 
supported.
at 
com.gojek.daggers.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:30)
Caused by: org.apache.flink.table.api.TableException: Only constant window 
descriptors are supported.
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:57)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsLong$1(DataStreamLogicalWindowAggregateRule.scala:72)
at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:88)
at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)

Any reason why this may be happening.

Thanks and regards,
arujit

On Thu, Nov 21, 2019 at 2:37 PM 贺小令(晓令)  wrote:
please try  this approach: interval + interval

like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE) AS 
window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR + INTERVAL '30' MINUTE)

thanks, 
godfrey

--
发件人:Arujit Pradhan 
发送时间:2019年11月21日(星期四) 16:23
收件人:user 
主 题:Compound Time interval in SQL queries

Hi all,

Is there a way to define a compound time interval(that can consist of both HOUR 
and MINUTE) in windows in a Flink SQL query.

For example, we want to do something like this:
SELECT count(1) AS event_count ,
 TUMBLE_END(rowtime,
 INTERVAL '7' HOUR
 AND '30' MINUTE) AS window_timestamp
FROM `data_stream`
GROUP BY TUMBLE ( rowtime, INTERVAL '7' HOUR '30' MINUTE )

We can not even convert this to Minutes as we are getting this error :
Interval field value  exceeds precision of MINUTE(2) field

We were going through Calcite documentation and could not find any workaround 
on this. 

Thanks and regards,
arujit