Thanks Timo 🙏

On Fri, Jun 4, 2021, 17:13 Timo Walther <twal...@apache.org> wrote:

> Hi Yuval,
>
> I would recommend option 2. Because esp. when it comes to state you
> should be in control what is persisted. There is no guarantee that the
> ExternalSerializer will not change in the future. It is only meant for
> shipping data as the input of the next operator.
>
> I would recommend to write some little tool that traverses
> `table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits
> the types that you allow in state (e.g. no structured types) and
> translates it to TypeInformation.
>
> Regards,
> Timo
>
>
> On 04.06.21 16:05, Yuval Itzchakov wrote:
> > Hi Timo,
> > Thank you for the response.
> >
> > The tables being created in reality are based on arbitrary SQL code such
> > that I don't know what the schema actually is to create the
> > TypeInformation "by hand" and pass it on to the DataStream API.
> >
> > This leaves me with option 1, which leads to another question: If I have
> > some state records stored in RocksDB from a current running job in a
> > previous Flink version (1.9.3), will changing the TypeInformation from
> > TypeInformation[Row] to the ExternalTypeInformation now break the
> > compatibility of the state currently stored and cause them to be losed
> > essentially? My guy feeling says yes unless some form of backwards
> > compatibility is going to be written specifically for the usecase.
> >
> >
> > On Fri, Jun 4, 2021, 16:33 Timo Walther <twal...@apache.org
> > <mailto:twal...@apache.org>> wrote:
> >
> >     Hi Yuval,
> >
> >
> >     TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge
> >     between TypeInformation and DataType until TypeInformation is not
> >     exposed through the Table API anymore.
> >
> >     Beginning from Flink 1.13 the Table API is able to serialize the
> >     records
> >     to the first DataStream operator via toDataStream or
> toChangelogStream.
> >     Internally, it uses
> >     org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that.
> The
> >     binary representation is using internal data structures and
> conversion
> >     will be performed during serialization/deserialization:
> >
> >     conversion -> internal -> conversion
> >
> >     You have two possibilities:
> >
> >     1) You simply call `tableEnv.toDataStream(table).getType()` and pass
> >     this type on to the next operator.
> >
> >     2) You define your own TypeInformation as you would usually do it in
> >     DataStream API without Table API.
> >
> >     We might serialize `Row`s with `RowSerializer` again in the near
> >     future.
> >     But for now we went with the most generic solution that supports
> >     everything that can come out of Table API.
> >
> >     Regards,
> >     Timo
> >
> >     On 04.06.21 15:12, Yuval Itzchakov wrote:
> >      > When upgrading to Flink 1.13, I ran into deprecation warnings on
> >      > TypeConversions
> >      >
> >      > image.png
> >      >
> >      > The deprecation message states that this API will be deprecated
> >     soon,
> >      > but does not mention the alternatives that can be used for these
> >      > transformations.
> >      >
> >      > My use case is that I have a table that needs to be converted
> into a
> >      > DataStream[Row] and in turn I need to apply some stateful
> >      > transformations on it. In order to do that I need the
> >      > TypeInformation[Row] produced in order to pass into the various
> >     state
> >      > functions.
> >      >
> >      > @Timo Walther <mailto:twal...@apache.org
> >     <mailto:twal...@apache.org>> I would love your help on this.
> >      > --
> >      > Best Regards,
> >      > Yuval Itzchakov.
> >
>
>

Reply via email to