Hi Flavio, tableEnv.registerTableSource is deprecated in order to migrate to use DDL and the new connector interface (i.e. FLIP-95 [1]). You may need to implement a `ScanTableSource` that uses `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Ok..just one last thing: to use my TableSource I use the deprecated > API registerTableSource: > > tableEnv.registerTableSource("MySourceDataset", tableSource); > > The javadoc says to use executeSql but this requires some extra steps > (that are not mentioned in the documentation). > Do I have to create a TableFactory, right? How do I register it? Is there > an example somewhere? > > On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote: > >> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the >> exception message definitely should be improved. >> We created a similar issue a long time before >> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might >> be complicated. >> >> Best, >> Jark >> >> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> You're right Jark..sorry I didn't see the typo. The backticks are also >>> mandatory. >>> Maybe the exception message could be more meaningful and specify the >>> token that caused the error instead of a general "SQL parse failed. >>> Non-query expression encountered in illegal context". >>> >>> Thanks a lot for the support, >>> Flavio >>> >>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> A typo of "INSERTO"? Try this? >>>> >>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); >>>> >>>> Best, >>>> Jark >>>> >>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it> >>>> wrote: >>>> >>>>> Now I'm able to run my code but there's something I don't understand: >>>>> what is the difference between the following two? >>>>> >>>>> //common code >>>>> final CsvTableSink outSink = new >>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>> tableEnv.registerTableSink("out", dsFields, >>>>> myInputformat.getFieldTypes(), outSink); >>>>> >>>>> - tableEnv.from("MySourceDataset").executeInsert("out"); >>>>> --> this works >>>>> - tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>>> MySourceDataset"); --> this does not work >>>>> >>>>> The second option fails with the following exception: >>>>> >>>>> Exception in thread "main" >>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query >>>>> expression encountered in illegal context >>>>> at >>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>>>> at >>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> >>>>> wrote: >>>>> >>>>>> hi Flavio, >>>>>> >>>>>> `BatchTableSource` can only be used for old planner. >>>>>> if you want to use Blink planner to run batch job, >>>>>> your table source should implement `StreamTableSource` >>>>>> and `isBounded` method return true. >>>>>> >>>>>> Best, >>>>>> Godfrey >>>>>> >>>>>> >>>>>> >>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >>>>>> >>>>>>> Is it correct to do something like this? >>>>>>> >>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>>>>>> @Override >>>>>>> public TableSchema getTableSchema() { >>>>>>> return new TableSchema(dsFields, ft); >>>>>>> } >>>>>>> @Override >>>>>>> public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { >>>>>>> return execEnv.createInput(myInputformat); >>>>>>> } >>>>>>> }; >>>>>>> >>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> How can you reuse InputFormat to write a TableSource? I think that >>>>>>>> at least initially this could be the simplest way to test the >>>>>>>> migration..then I could try yo implement the new Table Source interface >>>>>>>> >>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> hi Flavio, >>>>>>>>> Only old planner supports BatchTableEnvironment (which can convert >>>>>>>>> to/from DataSet), >>>>>>>>> while Blink planner in batch mode only support TableEnvironment. >>>>>>>>> Because Blink planner >>>>>>>>> convert the batch queries to Transformation (corresponding to >>>>>>>>> DataStream), instead of DataSet. >>>>>>>>> >>>>>>>>> one approach is you can migrate them to TableSource instead >>>>>>>>> (InputFormat can be reused), >>>>>>>>> but TableSource will be deprecated later. you can try new table >>>>>>>>> source[1] >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Godfrey >>>>>>>>> >>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道: >>>>>>>>> >>>>>>>>>> Thanks but I still can't understand how to migrate my legacy >>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv >>>>>>>>>> anymore so I >>>>>>>>>> can't call createInput. >>>>>>>>>> >>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>>>>>> TableSource instead? >>>>>>>>>> >>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>> ExecutionEnvironment env = >>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> BatchTableEnvironment btEnv = >>>>>>>>>> TableEnvironment.getTableEnvironment(env); >>>>>>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>>>>>> ft).finish(); >>>>>>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", >>>>>>>>>> dsFields)); >>>>>>>>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>>>>>>>>> "\t", 1, WriteMode.OVERWRITE); >>>>>>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>>>>>> env.execute(); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>>>>>> dwysakow...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> You should be good with using the TableEnvironment. The >>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to >>>>>>>>>>> DataStream. We do not support converting batch Table programs to >>>>>>>>>>> DataStream yet. >>>>>>>>>>> >>>>>>>>>>> A following code should work: >>>>>>>>>>> >>>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>> >>>>>>>>>>> TableEnvironment.create(settings); >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Dawid >>>>>>>>>>> >>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>>>>>> > Hi to all, >>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I >>>>>>>>>>> was >>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following: >>>>>>>>>>> > >>>>>>>>>>> > EnvironmentSettings settings = >>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>> > >>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>>>>>>>>>> > >>>>>>>>>>> > if (!settings.isStreamingMode()) { >>>>>>>>>>> > throw new TableException( >>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, >>>>>>>>>>> please use >>>>>>>>>>> > TableEnvironment."); >>>>>>>>>>> > } >>>>>>>>>>> > >>>>>>>>>>> > What should I do here? >>>>>>>>>>> > >>>>>>>>>>> > Thanks in advance, >>>>>>>>>>> > Flavio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>> >