You can set string-based configuration on `tEnv.getConfig.getConfiguration.setString(..)` to replace them. Maybe you can try pipeline.default-kryo-serializers [1].
Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <pomperma...@okkam.it> wrote: > And what about the env.registerTypeWithKryoSerializer? > Now to create the table environment I don't use the ExecutionEnvironment > anymore..how can I register those serializers? > For example I used to run > env.registerTypeWithKryoSerializer(DateTime.class, > JodaDateTimeSerializer.class); > > Best, > Flavio > > On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Flavio, >> >> tableEnv.registerTableSource is deprecated in order to migrate to use DDL >> and the new connector interface (i.e. FLIP-95 [1]). >> You may need to implement a `ScanTableSource` that uses >> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html >> >> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Ok..just one last thing: to use my TableSource I use the deprecated >>> API registerTableSource: >>> >>> tableEnv.registerTableSource("MySourceDataset", tableSource); >>> >>> The javadoc says to use executeSql but this requires some extra steps >>> (that are not mentioned in the documentation). >>> Do I have to create a TableFactory, right? How do I register it? Is >>> there an example somewhere? >>> >>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the >>>> exception message definitely should be improved. >>>> We created a similar issue a long time before >>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing >>>> might be complicated. >>>> >>>> Best, >>>> Jark >>>> >>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it> >>>> wrote: >>>> >>>>> You're right Jark..sorry I didn't see the typo. The backticks are also >>>>> mandatory. >>>>> Maybe the exception message could be more meaningful and specify the >>>>> token that caused the error instead of a general "SQL parse failed. >>>>> Non-query expression encountered in illegal context". >>>>> >>>>> Thanks a lot for the support, >>>>> Flavio >>>>> >>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> A typo of "INSERTO"? Try this? >>>>>> >>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >>>>>> MySourceDataset"); >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Now I'm able to run my code but there's something I don't >>>>>>> understand: what is the difference between the following two? >>>>>>> >>>>>>> //common code >>>>>>> final CsvTableSink outSink = new >>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>>> tableEnv.registerTableSink("out", dsFields, >>>>>>> myInputformat.getFieldTypes(), outSink); >>>>>>> >>>>>>> - tableEnv.from("MySourceDataset").executeInsert("out"); >>>>>>> --> this works >>>>>>> - tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>>>>> MySourceDataset"); --> this does not work >>>>>>> >>>>>>> The second option fails with the following exception: >>>>>>> >>>>>>> Exception in thread "main" >>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. >>>>>>> Non-query >>>>>>> expression encountered in illegal context >>>>>>> at >>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>>>>>> at >>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>>>>>> at >>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> hi Flavio, >>>>>>>> >>>>>>>> `BatchTableSource` can only be used for old planner. >>>>>>>> if you want to use Blink planner to run batch job, >>>>>>>> your table source should implement `StreamTableSource` >>>>>>>> and `isBounded` method return true. >>>>>>>> >>>>>>>> Best, >>>>>>>> Godfrey >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >>>>>>>> >>>>>>>>> Is it correct to do something like this? >>>>>>>>> >>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>>>>>>>> @Override >>>>>>>>> public TableSchema getTableSchema() { >>>>>>>>> return new TableSchema(dsFields, ft); >>>>>>>>> } >>>>>>>>> @Override >>>>>>>>> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) >>>>>>>>> { >>>>>>>>> return execEnv.createInput(myInputformat); >>>>>>>>> } >>>>>>>>> }; >>>>>>>>> >>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think >>>>>>>>>> that at least initially this could be the simplest way to test the >>>>>>>>>> migration..then I could try yo implement the new Table Source >>>>>>>>>> interface >>>>>>>>>> >>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> hi Flavio, >>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can >>>>>>>>>>> convert to/from DataSet), >>>>>>>>>>> while Blink planner in batch mode only support TableEnvironment. >>>>>>>>>>> Because Blink planner >>>>>>>>>>> convert the batch queries to Transformation (corresponding to >>>>>>>>>>> DataStream), instead of DataSet. >>>>>>>>>>> >>>>>>>>>>> one approach is you can migrate them to TableSource instead >>>>>>>>>>> (InputFormat can be reused), >>>>>>>>>>> but TableSource will be deprecated later. you can try new table >>>>>>>>>>> source[1] >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Godfrey >>>>>>>>>>> >>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 >>>>>>>>>>> 下午8:54写道: >>>>>>>>>>> >>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy >>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv >>>>>>>>>>>> anymore so I >>>>>>>>>>>> can't call createInput. >>>>>>>>>>>> >>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>>>>>>>> TableSource instead? >>>>>>>>>>>> >>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>> ExecutionEnvironment env = >>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>> BatchTableEnvironment btEnv = >>>>>>>>>>>> TableEnvironment.getTableEnvironment(env); >>>>>>>>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>>>>>>>> ft).finish(); >>>>>>>>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>>>>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", >>>>>>>>>>>> dsFields)); >>>>>>>>>>>> CsvTableSink outSink = new >>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>>>>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>>>>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>>>>>>>> env.execute(); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>>>>>>>> dwysakow...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> You should be good with using the TableEnvironment. The >>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>>>>>>>>> DataStream. We do not support converting batch Table programs >>>>>>>>>>>>> to >>>>>>>>>>>>> DataStream yet. >>>>>>>>>>>>> >>>>>>>>>>>>> A following code should work: >>>>>>>>>>>>> >>>>>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>>> >>>>>>>>>>>>> TableEnvironment.create(settings); >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> >>>>>>>>>>>>> Dawid >>>>>>>>>>>>> >>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>>>>>>>> > Hi to all, >>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before >>>>>>>>>>>>> I was >>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the >>>>>>>>>>>>> following: >>>>>>>>>>>>> > >>>>>>>>>>>>> > EnvironmentSettings settings = >>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>>> > >>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's >>>>>>>>>>>>> : >>>>>>>>>>>>> > >>>>>>>>>>>>> > if (!settings.isStreamingMode()) { >>>>>>>>>>>>> > throw new TableException( >>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, >>>>>>>>>>>>> please use >>>>>>>>>>>>> > TableEnvironment."); >>>>>>>>>>>>> > } >>>>>>>>>>>>> > >>>>>>>>>>>>> > What should I do here? >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thanks in advance, >>>>>>>>>>>>> > Flavio >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>> >>> >