Thanks, that was definitely helpful! On Mon, Jul 13, 2020 at 4:39 PM Jark Wu <imj...@gmail.com> wrote:
> You can set string-based configuration on > `tEnv.getConfig.getConfiguration.setString(..)` to replace them. > Maybe you can try pipeline.default-kryo-serializers [1]. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers > > On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> And what about the env.registerTypeWithKryoSerializer? >> Now to create the table environment I don't use the ExecutionEnvironment >> anymore..how can I register those serializers? >> For example I used to run >> env.registerTypeWithKryoSerializer(DateTime.class, >> JodaDateTimeSerializer.class); >> >> Best, >> Flavio >> >> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> tableEnv.registerTableSource is deprecated in order to migrate to use >>> DDL and the new connector interface (i.e. FLIP-95 [1]). >>> You may need to implement a `ScanTableSource` that uses >>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html >>> >>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Ok..just one last thing: to use my TableSource I use the deprecated >>>> API registerTableSource: >>>> >>>> tableEnv.registerTableSource("MySourceDataset", tableSource); >>>> >>>> The javadoc says to use executeSql but this requires some extra steps >>>> (that are not mentioned in the documentation). >>>> Do I have to create a TableFactory, right? How do I register it? Is >>>> there an example somewhere? >>>> >>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the >>>>> exception message definitely should be improved. >>>>> We created a similar issue a long time before >>>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing >>>>> might be complicated. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it> >>>>> wrote: >>>>> >>>>>> You're right Jark..sorry I didn't see the typo. The backticks are >>>>>> also mandatory. >>>>>> Maybe the exception message could be more meaningful and specify the >>>>>> token that caused the error instead of a general "SQL parse failed. >>>>>> Non-query expression encountered in illegal context". >>>>>> >>>>>> Thanks a lot for the support, >>>>>> Flavio >>>>>> >>>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> A typo of "INSERTO"? Try this? >>>>>>> >>>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >>>>>>> MySourceDataset"); >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Now I'm able to run my code but there's something I don't >>>>>>>> understand: what is the difference between the following two? >>>>>>>> >>>>>>>> //common code >>>>>>>> final CsvTableSink outSink = new >>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>>>> tableEnv.registerTableSink("out", dsFields, >>>>>>>> myInputformat.getFieldTypes(), outSink); >>>>>>>> >>>>>>>> - tableEnv.from("MySourceDataset").executeInsert("out"); >>>>>>>> --> this works >>>>>>>> - tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>>>>>> MySourceDataset"); --> this does not work >>>>>>>> >>>>>>>> The second option fails with the following exception: >>>>>>>> >>>>>>>> Exception in thread "main" >>>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. >>>>>>>> Non-query >>>>>>>> expression encountered in illegal context >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>>>>>>> at >>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> hi Flavio, >>>>>>>>> >>>>>>>>> `BatchTableSource` can only be used for old planner. >>>>>>>>> if you want to use Blink planner to run batch job, >>>>>>>>> your table source should implement `StreamTableSource` >>>>>>>>> and `isBounded` method return true. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Godfrey >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道: >>>>>>>>> >>>>>>>>>> Is it correct to do something like this? >>>>>>>>>> >>>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() { >>>>>>>>>> @Override >>>>>>>>>> public TableSchema getTableSchema() { >>>>>>>>>> return new TableSchema(dsFields, ft); >>>>>>>>>> } >>>>>>>>>> @Override >>>>>>>>>> public DataSet<Row> getDataSet(ExecutionEnvironment >>>>>>>>>> execEnv) { >>>>>>>>>> return execEnv.createInput(myInputformat); >>>>>>>>>> } >>>>>>>>>> }; >>>>>>>>>> >>>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think >>>>>>>>>>> that at least initially this could be the simplest way to test the >>>>>>>>>>> migration..then I could try yo implement the new Table Source >>>>>>>>>>> interface >>>>>>>>>>> >>>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> hi Flavio, >>>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can >>>>>>>>>>>> convert to/from DataSet), >>>>>>>>>>>> while Blink planner in batch mode only >>>>>>>>>>>> support TableEnvironment. Because Blink planner >>>>>>>>>>>> convert the batch queries to Transformation (corresponding to >>>>>>>>>>>> DataStream), instead of DataSet. >>>>>>>>>>>> >>>>>>>>>>>> one approach is you can migrate them to TableSource instead >>>>>>>>>>>> (InputFormat can be reused), >>>>>>>>>>>> but TableSource will be deprecated later. you can try new table >>>>>>>>>>>> source[1] >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Godfrey >>>>>>>>>>>> >>>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 >>>>>>>>>>>> 下午8:54写道: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy >>>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv >>>>>>>>>>>>> anymore so I >>>>>>>>>>>>> can't call createInput. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to >>>>>>>>>>>>> TableSource instead? >>>>>>>>>>>>> >>>>>>>>>>>>> public static void main(String[] args) throws Exception { >>>>>>>>>>>>> ExecutionEnvironment env = >>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>> BatchTableEnvironment btEnv = >>>>>>>>>>>>> TableEnvironment.getTableEnvironment(env); >>>>>>>>>>>>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>>>>>>>>>>>> ft).finish(); >>>>>>>>>>>>> DataSet<Row> rows = env.createInput(myInputformat); >>>>>>>>>>>>> Table table = btEnv.fromDataSet(rows, String.join(",", >>>>>>>>>>>>> dsFields)); >>>>>>>>>>>>> CsvTableSink outSink = new >>>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>>>>>>>>>>>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>>>>>>>>>>>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>>>>>>>>>>>> env.execute(); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>>>>>>>>>>>> dwysakow...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> You should be good with using the TableEnvironment. The >>>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert >>>>>>>>>>>>>> to >>>>>>>>>>>>>> DataStream. We do not support converting batch Table programs >>>>>>>>>>>>>> to >>>>>>>>>>>>>> DataStream yet. >>>>>>>>>>>>>> >>>>>>>>>>>>>> A following code should work: >>>>>>>>>>>>>> >>>>>>>>>>>>>> EnvironmentSettings settings = >>>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> TableEnvironment.create(settings); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dawid >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>>>>>>>>>>>>> > Hi to all, >>>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before >>>>>>>>>>>>>> I was >>>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the >>>>>>>>>>>>>> following: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > EnvironmentSettings settings = >>>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code >>>>>>>>>>>>>> there's : >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > if (!settings.isStreamingMode()) { >>>>>>>>>>>>>> > throw new TableException( >>>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now, >>>>>>>>>>>>>> please use >>>>>>>>>>>>>> > TableEnvironment."); >>>>>>>>>>>>>> > } >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > What should I do here? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thanks in advance, >>>>>>>>>>>>>> > Flavio >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>> >>>> >>