Hi Shammon,

I have two comments on PIP:

1. Does MemorySegmentAllocator need to be a public interface? It looks
like a internal implementation for Flink.

2. WriterBuilder withMemoryPool(MemorySegmentPool pool); We may not
add this to the write builder, because write builder is a serializable
class, maybe we should add this to TableWrite.

Best,
Jingsong

On Tue, Apr 25, 2023 at 6:55 PM Shammon FY <zjur...@gmail.com> wrote:
>
> Thanks for all the feedback and I have updated the PIP [1]. If there's no
> more comments, I'd like to start a vote on this PIP later. Thanks
>
>
> [1]
> https://cwiki.apache.org/confluence/display/PAIMON/PIP-1%3A+Improve+Shared+Writer+Buffer+Pool+For+Sink
>
> Best,
> Shammon FY
>
>
> On Tue, Apr 25, 2023 at 5:09 PM Guojun Li <gjli.schna...@gmail.com> wrote:
>
> > Thanks Shammon for your detailed response, it looks good to me.
> >
> > Best,
> > Guojun
> >
> > On Tue, Apr 25, 2023 at 3:28 PM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> >
> > > Thanks for explaining.
> > >
> > > Sounds good to me.
> > >
> > > 256 should be 256 MB.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Apr 25, 2023 at 3:03 PM Shammon FY <zjur...@gmail.com> wrote:
> > > >
> > > > Hi Jingsong & Guojun
> > > >
> > > > Users can config SINK_MANAGED_WRITER_BUFFER_WEIGHT in sql job, for
> > > example
> > > > `INSERT INTO paimon_table
> > > > /*+ OPTIONS('sink.use-managed-memory-allocator'='true',
> > > > 'sink.managed.writer-buffer-weight'='256') */ SELECT ... FROM ...;`
> > > >
> > > > Flink will calculate the memory of Managed Memory based on the weights
> > of
> > > > 70, 70, and 30 for the Operator, Statebackend, and Python according to
> > > the
> > > > requirements. After that, Flink normalizes the weight of each specific
> > > > operator and recalculates the operator's memory from the Operator
> > Managed
> > > > Memory.
> > > >
> > > > Users can config agg/sort operator weight for Flink Batch jobs with
> > > options
> > > > `table.exec.resource.hash-agg.memory`,
> > > > `table.exec.resource.hash-join.memory` and
> > > > `table.exec.resource.sort.memory`, the default weights for them are
> > 128.
> > > > For window operators in Flink Streaming jobs, there are const weights
> > > with
> > > > value 100.
> > > >
> > > > Compared to these operators, I think it's sufficient to use default
> > > weight
> > > > 256 for sink operator. If the writer buffer is out of memory, users
> > need
> > > to
> > > > increase the managed memory size. We can clearly describe it in the
> > > > document.
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Tue, Apr 25, 2023 at 12:22 PM Jingsong Li <jingsongl...@gmail.com>
> > > wrote:
> > > >
> > > > > Looks good to me!
> > > > >
> > > > > On Tue, Apr 25, 2023 at 12:12 PM Shammon FY <zjur...@gmail.com>
> > wrote:
> > > > > >
> > > > > > Hi Jingsong
> > > > > >
> > > > > > I agree with you that from a performance perspective there is
> > indeed
> > > no
> > > > > > need to create segments for Paimon based on Flink segments. If we
> > > convert
> > > > > > Flink MemorySegment to Paimon directly, I think we should remove
> > the
> > > > > `free`
> > > > > > method in Paimon's `MemorySegment`:
> > > > > > 1. As you mentioned above, Paimon won't free segment
> > > > > > 2. Avoid users mistakenly calling free method to duplicate release
> > > > > off-heap
> > > > > > memory
> > > > > >
> > > > > > In addition, we need to declare in the Paimon's MemorySegment that
> > > > > off-heap
> > > > > > memory needs to be allocated and released by the engine. What do
> > you
> > > > > think?
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Tue, Apr 25, 2023 at 9:53 AM Jingsong Li <
> > jingsongl...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > About Guojun's concerns,
> > > > > > >
> > > > > > > How to configure SINK_MANAGED_WRITER_BUFFER_WEIGHT?
> > > > > > > How does it allocate memory with sort and agg? What is the best
> > > value
> > > > > to
> > > > > > > assign?
> > > > > > >
> > > > > > > > users can configure managed memory weights for AGG and window
> > > > > operators
> > > > > > > for Flink jobs
> > > > > > >
> > > > > > > How to?
> > > > > > >
> > > > > > > I think we can add more explanation.
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > > On Tue, Apr 25, 2023 at 9:46 AM Jingsong Li <
> > > jingsongl...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Shammon,
> > > > > > > >
> > > > > > > > But nobody released the Paimon MemorySegment.
> > > > > > > >
> > > > > > > > I think we can have a clear definition here, Flink's memory is
> > > > > managed
> > > > > > > by Flink.
> > > > > > > >
> > > > > > > > The introduction of interface here has a big impact on
> > > performance,
> > > > > > > > and Flink did a lot of testing and optimization early on to
> > avoid
> > > > > > > > interface invoking as much as possible.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jingsong
> > > > > > > >
> > > > > > > > On Sun, Apr 23, 2023 at 5:51 PM Shammon FY <zjur...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Thanks for all the feedbacks.
> > > > > > > > >
> > > > > > > > > To Jingsong
> > > > > > > > > > Maybe we can just use some reflection method to get
> > > > > offHeapBuffer and
> > > > > > > > > heapMemory from Flink MemorySegment.
> > > > > > > > >
> > > > > > > > > Yes, we can indeed construct a MemorySegment for Paimon in
> > this
> > > > > way,
> > > > > > > but
> > > > > > > > > this method may have duplicate release issues for the
> > segment.
> > > > > Assuming
> > > > > > > > > Flink has applied for an off-heap memory, Paimon gets the
> > > off-heap
> > > > > > > buffer
> > > > > > > > > and creates its MemorySegment, then the off-heap buffer will
> > > be in
> > > > > > > Flink
> > > > > > > > > MemorySegment and Paimon MemorySegment. When the off-heap
> > > buffer is
> > > > > > > > > released in Paimon with `UNSAFE.freeMemory(this.address)`, it
> > > may
> > > > > be
> > > > > > > > > released again in Flink's MemorySegment.
> > > > > > > > >
> > > > > > > > > To Liming
> > > > > > > > > > Is it possible to cause a deadlock when requesting for
> > memory
> > > > > from
> > > > > > > the
> > > > > > > > > engine's managed memory? Is it necessary to add some memory
> > > > > checking or
> > > > > > > > > timeout mechanism here?
> > > > > > > > >
> > > > > > > > > Flink allocates segments for parallel tasks in MemoryManager.
> > > When
> > > > > the
> > > > > > > > > usage of memory in MemoryManager hits the limit, it will
> > throw
> > > > > > > Exception
> > > > > > > > >
> > > > > > > > > To Guojun
> > > > > > > > > > One question I'm thinking about is that will this increase
> > > the
> > > > > bar of
> > > > > > > > > writing performance maintenance on Paimon? Like how to decide
> > > an
> > > > > > > > > appropriate memory weight for users' writing jobs.
> > > > > > > > >
> > > > > > > > > Currently, users can configure managed memory weights for AGG
> > > and
> > > > > > > window
> > > > > > > > > operators for Flink jobs, this is similar to the writer
> > buffer
> > > pool
> > > > > > > weight
> > > > > > > > > configured in Paimon. So for Flink users, I think this will
> > > not be
> > > > > a
> > > > > > > > > problem.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Shammon FY
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Apr 21, 2023 at 11:42 AM Guojun Li <
> > > > > gjli.schna...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Shammon,
> > > > > > > > > >
> > > > > > > > > > Thank you for writing up the proposal. It's great to
> > > introduce
> > > > > this
> > > > > > > unified
> > > > > > > > > > memory management for Paimon!
> > > > > > > > > >
> > > > > > > > > > One question I'm thinking about is that will this increase
> > > the
> > > > > bar of
> > > > > > > > > > writing performance maintenance on Paimon? Like how to
> > > decide an
> > > > > > > > > > appropriate memory weight for users' writing jobs.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Guojun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Apr 20, 2023 at 8:45 PM Ming Li <
> > > joyce.li0...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Shammon for the proposal.
> > > > > > > > > > >
> > > > > > > > > > > For me it is more appropriate to leave the memory
> > > management
> > > > > to the
> > > > > > > > > > > computing engine.
> > > > > > > > > > >
> > > > > > > > > > > But I have a small question about this proposal. If the
> > > > > engine's
> > > > > > > memory
> > > > > > > > > > is
> > > > > > > > > > > not configured properly, is it possible to cause a
> > deadlock
> > > > > when
> > > > > > > > > > requesting
> > > > > > > > > > > for memory from the engine's managed memory? Is it
> > > necessary to
> > > > > > > add some
> > > > > > > > > > > memory checking or timeout mechanism here?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Ming Li
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Shammon FY <zjur...@gmail.com> 于2023年4月19日周三 09:57写道:
> > > > > > > > > > >
> > > > > > > > > > > > Hi devs:
> > > > > > > > > > > >
> > > > > > > > > > > > I would like to start a discussion of PIP-1: Improve
> > > Shared
> > > > > > > Writer
> > > > > > > > > > Buffer
> > > > > > > > > > > > Pool For Sink [1]. Currently Paimon sink task creates a
> > > heap
> > > > > > > memory
> > > > > > > > > > pool
> > > > > > > > > > > > which is shared by writers. When there are multiple
> > > tasks in
> > > > > an
> > > > > > > > > > Executor,
> > > > > > > > > > > > it may cause FullGC, performance issues and even OOM.
> > > > > > > > > > > >
> > > > > > > > > > > > This PIP aims to improve the buffer pool for writers in
> > > > > Paimon
> > > > > > > tasks.
> > > > > > > > > > > > Paimon tasks can create memory pools based on Executor
> > > Memory
> > > > > > > which
> > > > > > > > > > will
> > > > > > > > > > > be
> > > > > > > > > > > > managed by Executor, such as Managed Memory in Flink
> > > > > > > TaskManager. It
> > > > > > > > > > will
> > > > > > > > > > > > improve the stability and performance of sinks by
> > > managing
> > > > > writer
> > > > > > > > > > buffers
> > > > > > > > > > > > for multiple tasks through Executor.
> > > > > > > > > > > >
> > > > > > > > > > > > Looking forward to your feedback, thanks.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/PAIMON/PIP-1%3A+Improve+Shared+Writer+Buffer+Pool+For+Sink
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Shammon FY
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > >
> > >
> >

Reply via email to