Thank you for your reply.
This is my sql, self-join, calculate the proportion and then get top10
cumstors。
"mytable" has only 60,000 records, after joining, the "records send" is
2,869,940 records, and is still increasing.
select * from (
select
t1.id,
t1.month,
t1.customer,
t1.amount,
t1.counts,
t1.counts/t2.counts as countRate,
t1.amount/t2.amount as amountRate,
row_number() over(partition by t1.corpId, t1.month order by t1.amount
desc, t1.customer) as rn
from
(SELECT
id,
month,
customer,
sum(amount) AS amount,
sum(counts) AS counts
FROM
mytable
GROUP BY id,month,customer
)t1
inner join
(
SELECT
id,
month,
sum(amount) AS amount,
sum(counts) AS counts
FROM
mytable
WHERE
GROUP BY id,month
)t2
on t1.id = t2.id
and t1.month = t2.month
)t
where rn<=10
;
On Wed, Sep 4, 2019 at 7:48 PM Wesley Peng
wrote:
> Hi
>
> on 2019/9/4 19:30, liu ze wrote:
> > I use the row_number() over() function to do topN, the total amount of
> > data is 60,000, and the state is 12G .
> > Finally, oom, is there any way to optimize it?
>
> ref:
>
> https://stackoverflow.com/questions/50812837/flink-taskmanager-out-of-memory-and-memory-configuration
>
> The total amount of required physical and heap memory is quite difficult
> to compute since it strongly depends on your user code, your job's
> topology and which state backend you use.
>
> As a rule of thumb, if you experience OOM and are still using the
> FileSystemStateBackend or the MemoryStateBackend, then you should switch
> to RocksDBStateBackend, because it can gracefully spill to disk if the
> state grows too big.
>
> If you are still experiencing OOM exceptions as you have described, then
> you should check your user code whether it keeps references to state
> objects or generates in some other way large objects which cannot be
> garbage collected. If this is the case, then you should try to refactor
> your code to rely on Flink's state abstraction, because with RocksDB it
> can go out of core.
>
> RocksDB itself needs native memory which adds to Flink's memory
> footprint. This depends on the block cache size, indexes, bloom filters
> and memtables. You can find out more about these things and how to
> configure them here.
>
> Last but not least, you should not activate
> taskmanager.memory.preallocate when running streaming jobs, because
> streaming jobs currently don't use managed memory. Thus, by activating
> preallocation, you would allocate memory for Flink's managed memory
> which is reduces the available heap space.
>