Hi wenlong
      Thanks for your helpful suggestions, I will prepare the design doc
these days to outline the implementation plan for further discussion.

Best,
Aitozi

wenlong.lwl <wenlong88....@gmail.com> 于2022年1月10日周一 10:07写道:

> Hi, Aitozi,
> thanks for bringing up the discussion on the state evolution of flink sql.
> It would be a great improvement on flink sql.
> I think it would be better if you could prepare a document providing more
> details about the solution,
> it would be a big story and huge change and we need to discuss it
> comprehensively.
>
> Some important points:
> 1. how the digest of each column is generated?
> 2. how can we generate a serializer with multi versions, should we need the
> previous sql when we are compiling a new version?
> 3. what is the semantic when there are new added aggregation columns, and
> when there are cascading aggregation like?
> 4. can the design be extended in the future to support case 1?
>
> Best,
> Wenlong
>
>
> On Sun, 9 Jan 2022 at 17:38, Aitozi <gjying1...@gmail.com> wrote:
>
> > Hi all:
> >      When we use Flink SQL to develop job, we encounter a big problem
> that,
> > the state may become incompatible after changing sql. It mainly caused by
> > two case:
> >
> > 1. The operator number may change and make the state of the operator can
> > not mapping to the previous state.
> > 2. The format of the state value may change , may be caused by the
> > add/remove the column of aggregation operator.
> >
> > In this discussion, I want to proposal to solve the case two, by
> introduce
> > the mechanism of column digest for the RowData.
> >
> > 1. In sql job translate phase, we generate the digest for each column of
> > RowData.
> > 2. We create a new serializer may be calle MergeableRowDataSerializer
> which
> > includes the column digests for the RowData.
> > 3. We generate a int version number for each serialzier, and add the
> > version header in the serialized data. We can reply on
> > the version number to choose the suitable serializer to read the old
> data.
> > 4. We store multi-version serialzier in checkpoint during evolution, so
> > that we can support the lazy deserialization, which inspired by the avro
> > framework insights. In this way, we can avoid the full transfer of old
> data
> > during restoring (which may cost much time).
> > 5. We can also drop the old version of serializer after the ttl of state.
> >
> > We have apply this implementation in our internal version at (Ant
> > Financial), we are looking forward to give this back to flink repo, and
> > looking forward to some suggestion from the community.
> >
> > Best wishes
> > Aitozi
> >
>

Reply via email to