hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin <sunfulin0...@163.com> 于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
>     SELECT
>
>      aggId,
>
>      pageId,
>
>      statkey,
>
>      COUNT(DISTINCT deviceId) as cnt
>
>      FROM
>
>      (
>
>          SELECT
>
>          'ZL_005' as aggId,
>
>          'ZL_UV_PER_MINUTE' as pageId,
>
>          deviceId,
>
>          ts2Date(recvTime) as statkey
>
>          from
>
>          kafka_zl_etrack_event_stream
>
>      )
>
>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best

Reply via email to