Re: Table API jobs migration to Flink 1.11
Thanks, that was definitely helpful! On Mon, Jul 13, 2020 at 4:39 PM Jark Wu wrote: > You can set string-based configuration on > `tEnv.getConfig.getConfiguration.setString(..)` to replace them. > Maybe you can try pipeline.default-kryo-serializers [1]. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers > > On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier > wrote: > >> And what about the env.registerTypeWithKryoSerializer? >> Now to create the table environment I don't use the ExecutionEnvironment >> anymore..how can I register those serializers? >> For example I used to run >> env.registerTypeWithKryoSerializer(DateTime.class, >> JodaDateTimeSerializer.class); >> >> Best, >> Flavio >> >> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu wrote: >> >>> Hi Flavio, >>> >>> tableEnv.registerTableSource is deprecated in order to migrate to use >>> DDL and the new connector interface (i.e. FLIP-95 [1]). >>> You may need to implement a `ScanTableSource` that uses >>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html >>> >>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier >>> wrote: >>> Ok..just one last thing: to use my TableSource I use the deprecated API registerTableSource: tableEnv.registerTableSource("MySourceDataset", tableSource); The javadoc says to use executeSql but this requires some extra steps (that are not mentioned in the documentation). Do I have to create a TableFactory, right? How do I register it? Is there an example somewhere? On Mon, Jul 13, 2020 at 2:28 PM Jark Wu wrote: > I agree with you @Flavio Pompermaier , the > exception message definitely should be improved. > We created a similar issue a long time before > https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing > might be complicated. > > Best, > Jark > > On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier > wrote: > >> You're right Jark..sorry I didn't see the typo. The backticks are >> also mandatory. >> Maybe the exception message could be more meaningful and specify the >> token that caused the error instead of a general "SQL parse failed. >> Non-query expression encountered in illegal context". >> >> Thanks a lot for the support, >> Flavio >> >> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: >> >>> A typo of "INSERTO"? Try this? >>> >>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >>> MySourceDataset"); >>> >>> Best, >>> Jark >>> >>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> Now I'm able to run my code but there's something I don't understand: what is the difference between the following two? //common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInputformat.getFieldTypes(), outSink); - tableEnv.from("MySourceDataset").executeInsert("out"); --> this works - tableEnv.executeSql("INSERTO INTO out SELECT * FROM MySourceDataset"); --> this does not work The second option fails with the following exception: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query expression encountered in illegal context at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) Best, Flavio On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: > hi Flavio, > > `BatchTableSource` can only be used for old planner. > if you want to use Blink planner to run batch job, > your table source should implement `StreamTableSource` > and `isBounded` method return true. > > Best, > Godfrey > > > > Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > >> Is it correct to do something like this? >> >> TableSource myTableSource = new BatchTableSource() { >> @Override >> public TableSchema getTableSchema() { >> return new TableSchema(dsFields, ft); >> } >> @Override >> public
Re: Table API jobs migration to Flink 1.11
You can set string-based configuration on `tEnv.getConfig.getConfiguration.setString(..)` to replace them. Maybe you can try pipeline.default-kryo-serializers [1]. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier wrote: > And what about the env.registerTypeWithKryoSerializer? > Now to create the table environment I don't use the ExecutionEnvironment > anymore..how can I register those serializers? > For example I used to run > env.registerTypeWithKryoSerializer(DateTime.class, > JodaDateTimeSerializer.class); > > Best, > Flavio > > On Mon, Jul 13, 2020 at 3:16 PM Jark Wu wrote: > >> Hi Flavio, >> >> tableEnv.registerTableSource is deprecated in order to migrate to use DDL >> and the new connector interface (i.e. FLIP-95 [1]). >> You may need to implement a `ScanTableSource` that uses >> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html >> >> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier >> wrote: >> >>> Ok..just one last thing: to use my TableSource I use the deprecated >>> API registerTableSource: >>> >>> tableEnv.registerTableSource("MySourceDataset", tableSource); >>> >>> The javadoc says to use executeSql but this requires some extra steps >>> (that are not mentioned in the documentation). >>> Do I have to create a TableFactory, right? How do I register it? Is >>> there an example somewhere? >>> >>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu wrote: >>> I agree with you @Flavio Pompermaier , the exception message definitely should be improved. We created a similar issue a long time before https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be complicated. Best, Jark On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier wrote: > You're right Jark..sorry I didn't see the typo. The backticks are also > mandatory. > Maybe the exception message could be more meaningful and specify the > token that caused the error instead of a general "SQL parse failed. > Non-query expression encountered in illegal context". > > Thanks a lot for the support, > Flavio > > On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: > >> A typo of "INSERTO"? Try this? >> >> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM >> MySourceDataset"); >> >> Best, >> Jark >> >> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Now I'm able to run my code but there's something I don't >>> understand: what is the difference between the following two? >>> >>> //common code >>> final CsvTableSink outSink = new >>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >>> tableEnv.registerTableSink("out", dsFields, >>> myInputformat.getFieldTypes(), outSink); >>> >>>- tableEnv.from("MySourceDataset").executeInsert("out"); >>> --> this works >>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>MySourceDataset"); --> this does not work >>> >>> The second option fails with the following exception: >>> >>> Exception in thread "main" >>> org.apache.flink.table.api.SqlParserException: SQL parse failed. >>> Non-query >>> expression encountered in illegal context >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>> >>> Best, >>> Flavio >>> >>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he >>> wrote: >>> hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true. Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > Is it correct to do something like this? > > TableSource myTableSource = new BatchTableSource() { > @Override > public TableSchema getTableSchema() { > return new TableSchema(dsFields, ft); > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) > { > return execEnv.createInput(myInputformat); > } > }; > > On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >
Re: Table API jobs migration to Flink 1.11
And what about the env.registerTypeWithKryoSerializer? Now to create the table environment I don't use the ExecutionEnvironment anymore..how can I register those serializers? For example I used to run env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class); Best, Flavio On Mon, Jul 13, 2020 at 3:16 PM Jark Wu wrote: > Hi Flavio, > > tableEnv.registerTableSource is deprecated in order to migrate to use DDL > and the new connector interface (i.e. FLIP-95 [1]). > You may need to implement a `ScanTableSource` that uses > `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html > > On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier > wrote: > >> Ok..just one last thing: to use my TableSource I use the deprecated >> API registerTableSource: >> >> tableEnv.registerTableSource("MySourceDataset", tableSource); >> >> The javadoc says to use executeSql but this requires some extra steps >> (that are not mentioned in the documentation). >> Do I have to create a TableFactory, right? How do I register it? Is there >> an example somewhere? >> >> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu wrote: >> >>> I agree with you @Flavio Pompermaier , the >>> exception message definitely should be improved. >>> We created a similar issue a long time before >>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing >>> might be complicated. >>> >>> Best, >>> Jark >>> >>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier >>> wrote: >>> You're right Jark..sorry I didn't see the typo. The backticks are also mandatory. Maybe the exception message could be more meaningful and specify the token that caused the error instead of a general "SQL parse failed. Non-query expression encountered in illegal context". Thanks a lot for the support, Flavio On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: > A typo of "INSERTO"? Try this? > > tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); > > Best, > Jark > > On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier > wrote: > >> Now I'm able to run my code but there's something I don't understand: >> what is the difference between the following two? >> >> //common code >> final CsvTableSink outSink = new >> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); >> tableEnv.registerTableSink("out", dsFields, >> myInputformat.getFieldTypes(), outSink); >> >>- tableEnv.from("MySourceDataset").executeInsert("out"); >>--> this works >>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>MySourceDataset"); --> this does not work >> >> The second option fails with the following exception: >> >> Exception in thread "main" >> org.apache.flink.table.api.SqlParserException: SQL parse failed. >> Non-query >> expression encountered in illegal context >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >> >> Best, >> Flavio >> >> On Sun, Jul 12, 2020 at 5:04 PM godfrey he >> wrote: >> >>> hi Flavio, >>> >>> `BatchTableSource` can only be used for old planner. >>> if you want to use Blink planner to run batch job, >>> your table source should implement `StreamTableSource` >>> and `isBounded` method return true. >>> >>> Best, >>> Godfrey >>> >>> >>> >>> Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: >>> Is it correct to do something like this? TableSource myTableSource = new BatchTableSource() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { return execEnv.createInput(myInputformat); } }; On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < pomperma...@okkam.it> wrote: > How can you reuse InputFormat to write a TableSource? I think that > at least initially this could be the simplest way to test the > migration..then I could try yo implement the new Table Source > interface > > On Fri, Jul 10, 2020 at 3:38 PM godfrey he > wrote: > >> hi Flavio, >> Only old planner supports BatchTableEnvironment (which can >> convert to/from DataSet), >> while Blink planner
Re: Table API jobs migration to Flink 1.11
Hi Flavio, tableEnv.registerTableSource is deprecated in order to migrate to use DDL and the new connector interface (i.e. FLIP-95 [1]). You may need to implement a `ScanTableSource` that uses `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier wrote: > Ok..just one last thing: to use my TableSource I use the deprecated > API registerTableSource: > > tableEnv.registerTableSource("MySourceDataset", tableSource); > > The javadoc says to use executeSql but this requires some extra steps > (that are not mentioned in the documentation). > Do I have to create a TableFactory, right? How do I register it? Is there > an example somewhere? > > On Mon, Jul 13, 2020 at 2:28 PM Jark Wu wrote: > >> I agree with you @Flavio Pompermaier , the >> exception message definitely should be improved. >> We created a similar issue a long time before >> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might >> be complicated. >> >> Best, >> Jark >> >> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier >> wrote: >> >>> You're right Jark..sorry I didn't see the typo. The backticks are also >>> mandatory. >>> Maybe the exception message could be more meaningful and specify the >>> token that caused the error instead of a general "SQL parse failed. >>> Non-query expression encountered in illegal context". >>> >>> Thanks a lot for the support, >>> Flavio >>> >>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: >>> A typo of "INSERTO"? Try this? tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); Best, Jark On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier wrote: > Now I'm able to run my code but there's something I don't understand: > what is the difference between the following two? > > //common code > final CsvTableSink outSink = new > CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); > tableEnv.registerTableSink("out", dsFields, > myInputformat.getFieldTypes(), outSink); > >- tableEnv.from("MySourceDataset").executeInsert("out"); >--> this works >- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >MySourceDataset"); --> this does not work > > The second option fails with the following exception: > > Exception in thread "main" > org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query > expression encountered in illegal context > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > > Best, > Flavio > > On Sun, Jul 12, 2020 at 5:04 PM godfrey he > wrote: > >> hi Flavio, >> >> `BatchTableSource` can only be used for old planner. >> if you want to use Blink planner to run batch job, >> your table source should implement `StreamTableSource` >> and `isBounded` method return true. >> >> Best, >> Godfrey >> >> >> >> Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: >> >>> Is it correct to do something like this? >>> >>> TableSource myTableSource = new BatchTableSource() { >>> @Override >>> public TableSchema getTableSchema() { >>> return new TableSchema(dsFields, ft); >>> } >>> @Override >>> public DataSet getDataSet(ExecutionEnvironment execEnv) { >>> return execEnv.createInput(myInputformat); >>> } >>> }; >>> >>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: > hi Flavio, > Only old planner supports BatchTableEnvironment (which can convert > to/from DataSet), > while Blink planner in batch mode only support TableEnvironment. > Because Blink planner > convert the batch queries to Transformation (corresponding to > DataStream), instead of DataSet. > > one approach is you can migrate them to TableSource instead > (InputFormat can be reused), > but TableSource will be deprecated later. you can try new table > source[1] > > [1] >
Re: Table API jobs migration to Flink 1.11
Ok..just one last thing: to use my TableSource I use the deprecated API registerTableSource: tableEnv.registerTableSource("MySourceDataset", tableSource); The javadoc says to use executeSql but this requires some extra steps (that are not mentioned in the documentation). Do I have to create a TableFactory, right? How do I register it? Is there an example somewhere? On Mon, Jul 13, 2020 at 2:28 PM Jark Wu wrote: > I agree with you @Flavio Pompermaier , the > exception message definitely should be improved. > We created a similar issue a long time before > https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might > be complicated. > > Best, > Jark > > On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier > wrote: > >> You're right Jark..sorry I didn't see the typo. The backticks are also >> mandatory. >> Maybe the exception message could be more meaningful and specify the >> token that caused the error instead of a general "SQL parse failed. >> Non-query expression encountered in illegal context". >> >> Thanks a lot for the support, >> Flavio >> >> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: >> >>> A typo of "INSERTO"? Try this? >>> >>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); >>> >>> Best, >>> Jark >>> >>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier >>> wrote: >>> Now I'm able to run my code but there's something I don't understand: what is the difference between the following two? //common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInputformat.getFieldTypes(), outSink); - tableEnv.from("MySourceDataset").executeInsert("out"); --> this works - tableEnv.executeSql("INSERTO INTO out SELECT * FROM MySourceDataset"); --> this does not work The second option fails with the following exception: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query expression encountered in illegal context at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) Best, Flavio On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: > hi Flavio, > > `BatchTableSource` can only be used for old planner. > if you want to use Blink planner to run batch job, > your table source should implement `StreamTableSource` > and `isBounded` method return true. > > Best, > Godfrey > > > > Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > >> Is it correct to do something like this? >> >> TableSource myTableSource = new BatchTableSource() { >> @Override >> public TableSchema getTableSchema() { >> return new TableSchema(dsFields, ft); >> } >> @Override >> public DataSet getDataSet(ExecutionEnvironment execEnv) { >> return execEnv.createInput(myInputformat); >> } >> }; >> >> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> How can you reuse InputFormat to write a TableSource? I think that >>> at least initially this could be the simplest way to test the >>> migration..then I could try yo implement the new Table Source interface >>> >>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he >>> wrote: >>> hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you can migrate them to TableSource instead (InputFormat can be reused), but TableSource will be deprecated later. you can try new table source[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: > Thanks but I still can't understand how to migrate my legacy code. > The main problem is that I can't create a BatchTableEnv anymore so I > can't > call createInput. > > Is there a way to reuse InputFormats? Should I migrate them to > TableSource instead? > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env =
Re: Table API jobs migration to Flink 1.11
I agree with you @Flavio Pompermaier , the exception message definitely should be improved. We created a similar issue a long time before https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be complicated. Best, Jark On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier wrote: > You're right Jark..sorry I didn't see the typo. The backticks are also > mandatory. > Maybe the exception message could be more meaningful and specify the token > that caused the error instead of a general "SQL parse failed. Non-query > expression encountered in illegal context". > > Thanks a lot for the support, > Flavio > > On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: > >> A typo of "INSERTO"? Try this? >> >> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); >> >> Best, >> Jark >> >> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier >> wrote: >> >>> Now I'm able to run my code but there's something I don't understand: >>> what is the difference between the following two? >>> >>> //common code >>> final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>> "\t", 1, WriteMode.OVERWRITE); >>> tableEnv.registerTableSink("out", dsFields, >>> myInputformat.getFieldTypes(), outSink); >>> >>>- tableEnv.from("MySourceDataset").executeInsert("out"); >>> --> this works >>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>>MySourceDataset"); --> this does not work >>> >>> The second option fails with the following exception: >>> >>> Exception in thread "main" >>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query >>> expression encountered in illegal context >>> at >>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>> >>> Best, >>> Flavio >>> >>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: >>> hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true. Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > Is it correct to do something like this? > > TableSource myTableSource = new BatchTableSource() { > @Override > public TableSchema getTableSchema() { > return new TableSchema(dsFields, ft); > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > return execEnv.createInput(myInputformat); > } > }; > > On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < > pomperma...@okkam.it> wrote: > >> How can you reuse InputFormat to write a TableSource? I think that at >> least initially this could be the simplest way to test the >> migration..then >> I could try yo implement the new Table Source interface >> >> On Fri, Jul 10, 2020 at 3:38 PM godfrey he >> wrote: >> >>> hi Flavio, >>> Only old planner supports BatchTableEnvironment (which can convert >>> to/from DataSet), >>> while Blink planner in batch mode only support TableEnvironment. >>> Because Blink planner >>> convert the batch queries to Transformation (corresponding to >>> DataStream), instead of DataSet. >>> >>> one approach is you can migrate them to TableSource instead >>> (InputFormat can be reused), >>> but TableSource will be deprecated later. you can try new table >>> source[1] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>> >>> Best, >>> Godfrey >>> >>> Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: >>> Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput. Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); MyInputFormat myInputformat = new MyInputFormat(dsFields, ft).finish(); DataSet rows = env.createInput(myInputformat); Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); btEnv.registerTableSink("out",
Re: Table API jobs migration to Flink 1.11
You're right Jark..sorry I didn't see the typo. The backticks are also mandatory. Maybe the exception message could be more meaningful and specify the token that caused the error instead of a general "SQL parse failed. Non-query expression encountered in illegal context". Thanks a lot for the support, Flavio On Mon, Jul 13, 2020 at 1:41 PM Jark Wu wrote: > A typo of "INSERTO"? Try this? > > tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); > > Best, > Jark > > On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier > wrote: > >> Now I'm able to run my code but there's something I don't understand: >> what is the difference between the following two? >> >> //common code >> final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >> "\t", 1, WriteMode.OVERWRITE); >> tableEnv.registerTableSink("out", dsFields, >> myInputformat.getFieldTypes(), outSink); >> >>- tableEnv.from("MySourceDataset").executeInsert("out"); >>--> this works >>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >>MySourceDataset"); --> this does not work >> >> The second option fails with the following exception: >> >> Exception in thread "main" org.apache.flink.table.api.SqlParserException: >> SQL parse failed. Non-query expression encountered in illegal context >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >> >> Best, >> Flavio >> >> On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: >> >>> hi Flavio, >>> >>> `BatchTableSource` can only be used for old planner. >>> if you want to use Blink planner to run batch job, >>> your table source should implement `StreamTableSource` >>> and `isBounded` method return true. >>> >>> Best, >>> Godfrey >>> >>> >>> >>> Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: >>> Is it correct to do something like this? TableSource myTableSource = new BatchTableSource() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { return execEnv.createInput(myInputformat); } }; On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier < pomperma...@okkam.it> wrote: > How can you reuse InputFormat to write a TableSource? I think that at > least initially this could be the simplest way to test the migration..then > I could try yo implement the new Table Source interface > > On Fri, Jul 10, 2020 at 3:38 PM godfrey he > wrote: > >> hi Flavio, >> Only old planner supports BatchTableEnvironment (which can convert >> to/from DataSet), >> while Blink planner in batch mode only support TableEnvironment. >> Because Blink planner >> convert the batch queries to Transformation (corresponding to >> DataStream), instead of DataSet. >> >> one approach is you can migrate them to TableSource instead >> (InputFormat can be reused), >> but TableSource will be deprecated later. you can try new table >> source[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >> >> Best, >> Godfrey >> >> Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: >> >>> Thanks but I still can't understand how to migrate my legacy code. >>> The main problem is that I can't create a BatchTableEnv anymore so I >>> can't >>> call createInput. >>> >>> Is there a way to reuse InputFormats? Should I migrate them to >>> TableSource instead? >>> >>> public static void main(String[] args) throws Exception { >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> BatchTableEnvironment btEnv = >>> TableEnvironment.getTableEnvironment(env); >>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>> ft).finish(); >>> DataSet rows = env.createInput(myInputformat); >>> Table table = btEnv.fromDataSet(rows, String.join(",", >>> dsFields)); >>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >>> "\t", 1, WriteMode.OVERWRITE); >>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>> env.execute(); >>> } >>> >>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>> dwysakow...@apache.org> wrote: >>> You should be good with using the TableEnvironment. The StreamTableEnvironment is needed only if you want to convert to DataStream. We do not support converting batch Table programs to
Re: Table API jobs migration to Flink 1.11
A typo of "INSERTO"? Try this? tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset"); Best, Jark On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier wrote: > Now I'm able to run my code but there's something I don't understand: what > is the difference between the following two? > > //common code > final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", > "\t", 1, WriteMode.OVERWRITE); > tableEnv.registerTableSink("out", dsFields, > myInputformat.getFieldTypes(), outSink); > >- tableEnv.from("MySourceDataset").executeInsert("out"); >--> this works >- tableEnv.executeSql("INSERTO INTO out SELECT * FROM >MySourceDataset"); --> this does not work > > The second option fails with the following exception: > > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Non-query expression encountered in illegal context > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > > Best, > Flavio > > On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: > >> hi Flavio, >> >> `BatchTableSource` can only be used for old planner. >> if you want to use Blink planner to run batch job, >> your table source should implement `StreamTableSource` >> and `isBounded` method return true. >> >> Best, >> Godfrey >> >> >> >> Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: >> >>> Is it correct to do something like this? >>> >>> TableSource myTableSource = new BatchTableSource() { >>> @Override >>> public TableSchema getTableSchema() { >>> return new TableSchema(dsFields, ft); >>> } >>> @Override >>> public DataSet getDataSet(ExecutionEnvironment execEnv) { >>> return execEnv.createInput(myInputformat); >>> } >>> }; >>> >>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier >>> wrote: >>> How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: > hi Flavio, > Only old planner supports BatchTableEnvironment (which can convert > to/from DataSet), > while Blink planner in batch mode only support TableEnvironment. > Because Blink planner > convert the batch queries to Transformation (corresponding to > DataStream), instead of DataSet. > > one approach is you can migrate them to TableSource instead > (InputFormat can be reused), > but TableSource will be deprecated later. you can try new table > source[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html > > Best, > Godfrey > > Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: > >> Thanks but I still can't understand how to migrate my legacy code. >> The main problem is that I can't create a BatchTableEnv anymore so I >> can't >> call createInput. >> >> Is there a way to reuse InputFormats? Should I migrate them to >> TableSource instead? >> >> public static void main(String[] args) throws Exception { >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> BatchTableEnvironment btEnv = >> TableEnvironment.getTableEnvironment(env); >> MyInputFormat myInputformat = new MyInputFormat(dsFields, >> ft).finish(); >> DataSet rows = env.createInput(myInputformat); >> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", >> "\t", 1, WriteMode.OVERWRITE); >> btEnv.registerTableSink("out", dsFields, ft, outSink); >> btEnv.insertInto(table, "out", btEnv.queryConfig()); >> env.execute(); >> } >> >> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >> dwysakow...@apache.org> wrote: >> >>> You should be good with using the TableEnvironment. The >>> StreamTableEnvironment is needed only if you want to convert to >>> DataStream. We do not support converting batch Table programs to >>> DataStream yet. >>> >>> A following code should work: >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inBatchMode().build(); >>> >>> TableEnvironment.create(settings); >>> >>> Best, >>> >>> Dawid >>> >>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>> > Hi to all, >>> > I was trying to update my legacy code to Flink 1.11. Before I was >>> > using a BatchTableEnv and now I've tried to use the following: >>> > >>> >
Re: Table API jobs migration to Flink 1.11
Now I'm able to run my code but there's something I don't understand: what is the difference between the following two? //common code final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); tableEnv.registerTableSink("out", dsFields, myInputformat.getFieldTypes(), outSink); - tableEnv.from("MySourceDataset").executeInsert("out"); --> this works - tableEnv.executeSql("INSERTO INTO out SELECT * FROM MySourceDataset"); --> this does not work The second option fails with the following exception: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query expression encountered in illegal context at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) Best, Flavio On Sun, Jul 12, 2020 at 5:04 PM godfrey he wrote: > hi Flavio, > > `BatchTableSource` can only be used for old planner. > if you want to use Blink planner to run batch job, > your table source should implement `StreamTableSource` > and `isBounded` method return true. > > Best, > Godfrey > > > > Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > >> Is it correct to do something like this? >> >> TableSource myTableSource = new BatchTableSource() { >> @Override >> public TableSchema getTableSchema() { >> return new TableSchema(dsFields, ft); >> } >> @Override >> public DataSet getDataSet(ExecutionEnvironment execEnv) { >> return execEnv.createInput(myInputformat); >> } >> }; >> >> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier >> wrote: >> >>> How can you reuse InputFormat to write a TableSource? I think that at >>> least initially this could be the simplest way to test the migration..then >>> I could try yo implement the new Table Source interface >>> >>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: >>> hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you can migrate them to TableSource instead (InputFormat can be reused), but TableSource will be deprecated later. you can try new table source[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: > Thanks but I still can't understand how to migrate my legacy code. The > main problem is that I can't create a BatchTableEnv anymore so I can't > call createInput. > > Is there a way to reuse InputFormats? Should I migrate them to > TableSource instead? > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment btEnv = > TableEnvironment.getTableEnvironment(env); > MyInputFormat myInputformat = new MyInputFormat(dsFields, > ft).finish(); > DataSet rows = env.createInput(myInputformat); > Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); > CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", > "\t", 1, WriteMode.OVERWRITE); > btEnv.registerTableSink("out", dsFields, ft, outSink); > btEnv.insertInto(table, "out", btEnv.queryConfig()); > env.execute(); > } > > On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < > dwysakow...@apache.org> wrote: > >> You should be good with using the TableEnvironment. The >> StreamTableEnvironment is needed only if you want to convert to >> DataStream. We do not support converting batch Table programs to >> DataStream yet. >> >> A following code should work: >> >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().inBatchMode().build(); >> >> TableEnvironment.create(settings); >> >> Best, >> >> Dawid >> >> On 10/07/2020 11:48, Flavio Pompermaier wrote: >> > Hi to all, >> > I was trying to update my legacy code to Flink 1.11. Before I was >> > using a BatchTableEnv and now I've tried to use the following: >> > >> > EnvironmentSettings settings = >> > EnvironmentSettings.newInstance().inBatchMode().build(); >> > >> > Unfortunately in the StreamTableEnvironmentImpl code there's : >> > >> > if (!settings.isStreamingMode()) { >> > throw new TableException( >> > "StreamTableEnvironment can not run in batch mode for now, please
Re: Table API jobs migration to Flink 1.11
hi Flavio, `BatchTableSource` can only be used for old planner. if you want to use Blink planner to run batch job, your table source should implement `StreamTableSource` and `isBounded` method return true. Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午10:32写道: > Is it correct to do something like this? > > TableSource myTableSource = new BatchTableSource() { > @Override > public TableSchema getTableSchema() { > return new TableSchema(dsFields, ft); > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > return execEnv.createInput(myInputformat); > } > }; > > On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier > wrote: > >> How can you reuse InputFormat to write a TableSource? I think that at >> least initially this could be the simplest way to test the migration..then >> I could try yo implement the new Table Source interface >> >> On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: >> >>> hi Flavio, >>> Only old planner supports BatchTableEnvironment (which can convert >>> to/from DataSet), >>> while Blink planner in batch mode only support TableEnvironment. Because >>> Blink planner >>> convert the batch queries to Transformation (corresponding to >>> DataStream), instead of DataSet. >>> >>> one approach is you can migrate them to TableSource instead (InputFormat >>> can be reused), >>> but TableSource will be deprecated later. you can try new table source[1] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >>> >>> Best, >>> Godfrey >>> >>> Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: >>> Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput. Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); MyInputFormat myInputformat = new MyInputFormat(dsFields, ft).finish(); DataSet rows = env.createInput(myInputformat); Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); btEnv.registerTableSink("out", dsFields, ft, outSink); btEnv.insertInto(table, "out", btEnv.queryConfig()); env.execute(); } On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < dwysakow...@apache.org> wrote: > You should be good with using the TableEnvironment. The > StreamTableEnvironment is needed only if you want to convert to > DataStream. We do not support converting batch Table programs to > DataStream yet. > > A following code should work: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > TableEnvironment.create(settings); > > Best, > > Dawid > > On 10/07/2020 11:48, Flavio Pompermaier wrote: > > Hi to all, > > I was trying to update my legacy code to Flink 1.11. Before I was > > using a BatchTableEnv and now I've tried to use the following: > > > > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inBatchMode().build(); > > > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > > > if (!settings.isStreamingMode()) { > > throw new TableException( > > "StreamTableEnvironment can not run in batch mode for now, please use > > TableEnvironment."); > > } > > > > What should I do here? > > > > Thanks in advance, > > Flavio > >
Re: Table API jobs migration to Flink 1.11
Is it correct to do something like this? TableSource myTableSource = new BatchTableSource() { @Override public TableSchema getTableSchema() { return new TableSchema(dsFields, ft); } @Override public DataSet getDataSet(ExecutionEnvironment execEnv) { return execEnv.createInput(myInputformat); } }; On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier wrote: > How can you reuse InputFormat to write a TableSource? I think that at > least initially this could be the simplest way to test the migration..then > I could try yo implement the new Table Source interface > > On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: > >> hi Flavio, >> Only old planner supports BatchTableEnvironment (which can convert >> to/from DataSet), >> while Blink planner in batch mode only support TableEnvironment. Because >> Blink planner >> convert the batch queries to Transformation (corresponding to >> DataStream), instead of DataSet. >> >> one approach is you can migrate them to TableSource instead (InputFormat >> can be reused), >> but TableSource will be deprecated later. you can try new table source[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html >> >> Best, >> Godfrey >> >> Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: >> >>> Thanks but I still can't understand how to migrate my legacy code. The >>> main problem is that I can't create a BatchTableEnv anymore so I can't >>> call createInput. >>> >>> Is there a way to reuse InputFormats? Should I migrate them to >>> TableSource instead? >>> >>> public static void main(String[] args) throws Exception { >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> BatchTableEnvironment btEnv = >>> TableEnvironment.getTableEnvironment(env); >>> MyInputFormat myInputformat = new MyInputFormat(dsFields, >>> ft).finish(); >>> DataSet rows = env.createInput(myInputformat); >>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", >>> 1, WriteMode.OVERWRITE); >>> btEnv.registerTableSink("out", dsFields, ft, outSink); >>> btEnv.insertInto(table, "out", btEnv.queryConfig()); >>> env.execute(); >>> } >>> >>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz < >>> dwysakow...@apache.org> wrote: >>> You should be good with using the TableEnvironment. The StreamTableEnvironment is needed only if you want to convert to DataStream. We do not support converting batch Table programs to DataStream yet. A following code should work: EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment.create(settings); Best, Dawid On 10/07/2020 11:48, Flavio Pompermaier wrote: > Hi to all, > I was trying to update my legacy code to Flink 1.11. Before I was > using a BatchTableEnv and now I've tried to use the following: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > if (!settings.isStreamingMode()) { > throw new TableException( > "StreamTableEnvironment can not run in batch mode for now, please use > TableEnvironment."); > } > > What should I do here? > > Thanks in advance, > Flavio >>>
Re: Table API jobs migration to Flink 1.11
How can you reuse InputFormat to write a TableSource? I think that at least initially this could be the simplest way to test the migration..then I could try yo implement the new Table Source interface On Fri, Jul 10, 2020 at 3:38 PM godfrey he wrote: > hi Flavio, > Only old planner supports BatchTableEnvironment (which can convert to/from > DataSet), > while Blink planner in batch mode only support TableEnvironment. Because > Blink planner > convert the batch queries to Transformation (corresponding to DataStream), > instead of DataSet. > > one approach is you can migrate them to TableSource instead (InputFormat > can be reused), > but TableSource will be deprecated later. you can try new table source[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html > > Best, > Godfrey > > Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: > >> Thanks but I still can't understand how to migrate my legacy code. The >> main problem is that I can't create a BatchTableEnv anymore so I can't >> call createInput. >> >> Is there a way to reuse InputFormats? Should I migrate them to >> TableSource instead? >> >> public static void main(String[] args) throws Exception { >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> BatchTableEnvironment btEnv = >> TableEnvironment.getTableEnvironment(env); >> MyInputFormat myInputformat = new MyInputFormat(dsFields, >> ft).finish(); >> DataSet rows = env.createInput(myInputformat); >> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); >> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", >> 1, WriteMode.OVERWRITE); >> btEnv.registerTableSink("out", dsFields, ft, outSink); >> btEnv.insertInto(table, "out", btEnv.queryConfig()); >> env.execute(); >> } >> >> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz >> wrote: >> >>> You should be good with using the TableEnvironment. The >>> StreamTableEnvironment is needed only if you want to convert to >>> DataStream. We do not support converting batch Table programs to >>> DataStream yet. >>> >>> A following code should work: >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance().inBatchMode().build(); >>> >>> TableEnvironment.create(settings); >>> >>> Best, >>> >>> Dawid >>> >>> On 10/07/2020 11:48, Flavio Pompermaier wrote: >>> > Hi to all, >>> > I was trying to update my legacy code to Flink 1.11. Before I was >>> > using a BatchTableEnv and now I've tried to use the following: >>> > >>> > EnvironmentSettings settings = >>> > EnvironmentSettings.newInstance().inBatchMode().build(); >>> > >>> > Unfortunately in the StreamTableEnvironmentImpl code there's : >>> > >>> > if (!settings.isStreamingMode()) { >>> > throw new TableException( >>> > "StreamTableEnvironment can not run in batch mode for now, please use >>> > TableEnvironment."); >>> > } >>> > >>> > What should I do here? >>> > >>> > Thanks in advance, >>> > Flavio >>> >>> >>
Re: Table API jobs migration to Flink 1.11
hi Flavio, Only old planner supports BatchTableEnvironment (which can convert to/from DataSet), while Blink planner in batch mode only support TableEnvironment. Because Blink planner convert the batch queries to Transformation (corresponding to DataStream), instead of DataSet. one approach is you can migrate them to TableSource instead (InputFormat can be reused), but TableSource will be deprecated later. you can try new table source[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html Best, Godfrey Flavio Pompermaier 于2020年7月10日周五 下午8:54写道: > Thanks but I still can't understand how to migrate my legacy code. The > main problem is that I can't create a BatchTableEnv anymore so I can't > call createInput. > > Is there a way to reuse InputFormats? Should I migrate them to TableSource > instead? > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment btEnv = > TableEnvironment.getTableEnvironment(env); > MyInputFormat myInputformat = new MyInputFormat(dsFields, > ft).finish(); > DataSet rows = env.createInput(myInputformat); > Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); > CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, > WriteMode.OVERWRITE); > btEnv.registerTableSink("out", dsFields, ft, outSink); > btEnv.insertInto(table, "out", btEnv.queryConfig()); > env.execute(); > } > > On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz > wrote: > >> You should be good with using the TableEnvironment. The >> StreamTableEnvironment is needed only if you want to convert to >> DataStream. We do not support converting batch Table programs to >> DataStream yet. >> >> A following code should work: >> >> EnvironmentSettings settings = >> EnvironmentSettings.newInstance().inBatchMode().build(); >> >> TableEnvironment.create(settings); >> >> Best, >> >> Dawid >> >> On 10/07/2020 11:48, Flavio Pompermaier wrote: >> > Hi to all, >> > I was trying to update my legacy code to Flink 1.11. Before I was >> > using a BatchTableEnv and now I've tried to use the following: >> > >> > EnvironmentSettings settings = >> > EnvironmentSettings.newInstance().inBatchMode().build(); >> > >> > Unfortunately in the StreamTableEnvironmentImpl code there's : >> > >> > if (!settings.isStreamingMode()) { >> > throw new TableException( >> > "StreamTableEnvironment can not run in batch mode for now, please use >> > TableEnvironment."); >> > } >> > >> > What should I do here? >> > >> > Thanks in advance, >> > Flavio >> >> > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 041809 >
Re: Table API jobs migration to Flink 1.11
Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput. Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); MyInputFormat myInputformat = new MyInputFormat(dsFields, ft).finish(); DataSet rows = env.createInput(myInputformat); Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); btEnv.registerTableSink("out", dsFields, ft, outSink); btEnv.insertInto(table, "out", btEnv.queryConfig()); env.execute(); } On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz wrote: > You should be good with using the TableEnvironment. The > StreamTableEnvironment is needed only if you want to convert to > DataStream. We do not support converting batch Table programs to > DataStream yet. > > A following code should work: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > TableEnvironment.create(settings); > > Best, > > Dawid > > On 10/07/2020 11:48, Flavio Pompermaier wrote: > > Hi to all, > > I was trying to update my legacy code to Flink 1.11. Before I was > > using a BatchTableEnv and now I've tried to use the following: > > > > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inBatchMode().build(); > > > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > > > if (!settings.isStreamingMode()) { > > throw new TableException( > > "StreamTableEnvironment can not run in batch mode for now, please use > > TableEnvironment."); > > } > > > > What should I do here? > > > > Thanks in advance, > > Flavio > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809
Re: Table API jobs migration to Flink 1.11
You should be good with using the TableEnvironment. The StreamTableEnvironment is needed only if you want to convert to DataStream. We do not support converting batch Table programs to DataStream yet. A following code should work: EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment.create(settings); Best, Dawid On 10/07/2020 11:48, Flavio Pompermaier wrote: > Hi to all, > I was trying to update my legacy code to Flink 1.11. Before I was > using a BatchTableEnv and now I've tried to use the following: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > if (!settings.isStreamingMode()) { > throw new TableException( > "StreamTableEnvironment can not run in batch mode for now, please use > TableEnvironment."); > } > > What should I do here? > > Thanks in advance, > Flavio signature.asc Description: OpenPGP digital signature
Table API jobs migration to Flink 1.11
Hi to all, I was trying to update my legacy code to Flink 1.11. Before I was using a BatchTableEnv and now I've tried to use the following: EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); Unfortunately in the StreamTableEnvironmentImpl code there's : if (!settings.isStreamingMode()) { throw new TableException( "StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment."); } What should I do here? Thanks in advance, Flavio