[ 
https://issues.apache.org/jira/browse/BEAM-1612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated BEAM-1612:
-------------------------------
    Description: 
The Bundle is very important in the beam model. Users can use the bundle to 
flush buffer, can reuse many heavyweight resources in a bundle. Most IO plugins 
use the bundle to flush. 

Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
such as first placed in JavaHeap, flush into RocksDbState when invoke 
finishBundle , this can reduce the number of serialization.

But now FlinkRunner calls the finishBundle every processElement. We need 
support real Bundle.

I think we can have the following implementations:

1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
configuration.

2.Manually control the size of the bundle. The half-bundle will be flushed to a 
full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
need to wait, just call the startBundle and finishBundle at the right time.

[Proposal 
document|https://docs.google.com/document/d/1UzELM4nFu8SIeu-QJkbs0sv7Uzd1Ux4aXXM3cw4s7po/edit?usp=sharing]


  was:
The Bundle is very important in the beam model. Users can use the bundle to 
flush buffer, can reuse many heavyweight resources in a bundle. Most IO plugins 
use the bundle to flush. 

Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
such as first placed in JavaHeap, flush into RocksDbState when invoke 
finishBundle , this can reduce the number of serialization.

But now FlinkRunner calls the finishBundle every processElement. We need 
support real Bundle.

I think we can have the following implementations:

1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
configuration.

2.Manually control the size of the bundle. The half-bundle will be flushed to a 
full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
need to wait, just call the startBundle and finishBundle at the right time.



> Support real Bundle in Flink runner
> -----------------------------------
>
>                 Key: BEAM-1612
>                 URL: https://issues.apache.org/jira/browse/BEAM-1612
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>
> The Bundle is very important in the beam model. Users can use the bundle to 
> flush buffer, can reuse many heavyweight resources in a bundle. Most IO 
> plugins use the bundle to flush. 
> Moreover, FlinkRunner can also use Bundle to reduce access to the FlinkState, 
> such as first placed in JavaHeap, flush into RocksDbState when invoke 
> finishBundle , this can reduce the number of serialization.
> But now FlinkRunner calls the finishBundle every processElement. We need 
> support real Bundle.
> I think we can have the following implementations:
> 1.Invoke finishBundle and next startBundle in {{snapshot}} of Flink. But 
> sometimes this "Bundle" maybe too big. This depends on the user's checkpoint 
> configuration.
> 2.Manually control the size of the bundle. The half-bundle will be flushed to 
> a full-bundle by count or eventTime or processTime or {{snapshot}}. We do not 
> need to wait, just call the startBundle and finishBundle at the right time.
> [Proposal 
> document|https://docs.google.com/document/d/1UzELM4nFu8SIeu-QJkbs0sv7Uzd1Ux4aXXM3cw4s7po/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to