Thanks Timo 🙏 On Fri, Jun 4, 2021, 17:13 Timo Walther <twal...@apache.org> wrote:
> Hi Yuval, > > I would recommend option 2. Because esp. when it comes to state you > should be in control what is persisted. There is no guarantee that the > ExternalSerializer will not change in the future. It is only meant for > shipping data as the input of the next operator. > > I would recommend to write some little tool that traverses > `table.getResolvedSchema().toSourceDataType().getLogicalType()`, limits > the types that you allow in state (e.g. no structured types) and > translates it to TypeInformation. > > Regards, > Timo > > > On 04.06.21 16:05, Yuval Itzchakov wrote: > > Hi Timo, > > Thank you for the response. > > > > The tables being created in reality are based on arbitrary SQL code such > > that I don't know what the schema actually is to create the > > TypeInformation "by hand" and pass it on to the DataStream API. > > > > This leaves me with option 1, which leads to another question: If I have > > some state records stored in RocksDB from a current running job in a > > previous Flink version (1.9.3), will changing the TypeInformation from > > TypeInformation[Row] to the ExternalTypeInformation now break the > > compatibility of the state currently stored and cause them to be losed > > essentially? My guy feeling says yes unless some form of backwards > > compatibility is going to be written specifically for the usecase. > > > > > > On Fri, Jun 4, 2021, 16:33 Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Yuval, > > > > > > TypeConversions.fromDataTypeToLegacyInfo was only a utility to bridge > > between TypeInformation and DataType until TypeInformation is not > > exposed through the Table API anymore. > > > > Beginning from Flink 1.13 the Table API is able to serialize the > > records > > to the first DataStream operator via toDataStream or > toChangelogStream. > > Internally, it uses > > org.apache.flink.table.runtime.typeutils.ExternalTypeInfo for that. > The > > binary representation is using internal data structures and > conversion > > will be performed during serialization/deserialization: > > > > conversion -> internal -> conversion > > > > You have two possibilities: > > > > 1) You simply call `tableEnv.toDataStream(table).getType()` and pass > > this type on to the next operator. > > > > 2) You define your own TypeInformation as you would usually do it in > > DataStream API without Table API. > > > > We might serialize `Row`s with `RowSerializer` again in the near > > future. > > But for now we went with the most generic solution that supports > > everything that can come out of Table API. > > > > Regards, > > Timo > > > > On 04.06.21 15:12, Yuval Itzchakov wrote: > > > When upgrading to Flink 1.13, I ran into deprecation warnings on > > > TypeConversions > > > > > > image.png > > > > > > The deprecation message states that this API will be deprecated > > soon, > > > but does not mention the alternatives that can be used for these > > > transformations. > > > > > > My use case is that I have a table that needs to be converted > into a > > > DataStream[Row] and in turn I need to apply some stateful > > > transformations on it. In order to do that I need the > > > TypeInformation[Row] produced in order to pass into the various > > state > > > functions. > > > > > > @Timo Walther <mailto:twal...@apache.org > > <mailto:twal...@apache.org>> I would love your help on this. > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > >