Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Thanks, that was definitely helpful!

On Mon, Jul 13, 2020 at 4:39 PM Jark Wu  wrote:

> You can set string-based configuration on
> `tEnv.getConfig.getConfiguration.setString(..)` to replace them.
> Maybe you can try pipeline.default-kryo-serializers [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers
>
> On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier 
> wrote:
>
>> And what about the env.registerTypeWithKryoSerializer?
>> Now to create the table environment I don't use the ExecutionEnvironment
>> anymore..how can I register those serializers?
>> For example I used to run
>> env.registerTypeWithKryoSerializer(DateTime.class,
>> JodaDateTimeSerializer.class);
>>
>> Best,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu  wrote:
>>
>>> Hi Flavio,
>>>
>>> tableEnv.registerTableSource is deprecated in order to migrate to use
>>> DDL and the new connector interface (i.e. FLIP-95 [1]).
>>> You may need to implement a `ScanTableSource` that uses
>>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>>
>>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier 
>>> wrote:
>>>
 Ok..just one last thing: to use my TableSource I use the deprecated
 API registerTableSource:

 tableEnv.registerTableSource("MySourceDataset", tableSource);

 The javadoc says to use executeSql but this requires some extra steps
 (that are not mentioned in the documentation).
 Do I have to create a TableFactory, right? How do I register it? Is
 there an example somewhere?

 On Mon, Jul 13, 2020 at 2:28 PM Jark Wu  wrote:

> I agree with you @Flavio Pompermaier  , the
> exception message definitely should be improved.
> We created a similar issue a long time before
> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
> might be complicated.
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
> wrote:
>
>> You're right Jark..sorry I didn't see the typo. The backticks are
>> also mandatory.
>> Maybe the exception message could be more meaningful and specify the
>> token that caused the error instead of a general "SQL parse failed.
>> Non-query expression encountered in illegal context".
>>
>> Thanks a lot for the support,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:
>>
>>> A typo of "INSERTO"? Try this?
>>>
>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>> MySourceDataset");
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Now I'm able to run my code but there's something I don't
 understand: what is the difference between the following two?

  //common code
  final CsvTableSink outSink = new
 CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
  tableEnv.registerTableSink("out", dsFields,
 myInputformat.getFieldTypes(), outSink);

- tableEnv.from("MySourceDataset").executeInsert("out");
  --> this works
- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
MySourceDataset");   --> this does not work

 The second option fails with the following exception:

 Exception in thread "main"
 org.apache.flink.table.api.SqlParserException: SQL parse failed. 
 Non-query
 expression encountered in illegal context
 at
 org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
 at
 org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
 at
 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)

 Best,
 Flavio

 On Sun, Jul 12, 2020 at 5:04 PM godfrey he 
 wrote:

> hi Flavio,
>
> `BatchTableSource` can only be used for old planner.
> if you want to use Blink planner to run batch job,
> your table source should implement `StreamTableSource`
> and `isBounded` method return true.
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>
>> Is it correct to do something like this?
>>
>> TableSource myTableSource = new BatchTableSource() {
>>   @Override
>>   public TableSchema getTableSchema() {
>> return new TableSchema(dsFields, ft);
>>   }
>>   @Override
>>   public 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
You can set string-based configuration on
`tEnv.getConfig.getConfiguration.setString(..)` to replace them.
Maybe you can try pipeline.default-kryo-serializers [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers

On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier 
wrote:

> And what about the env.registerTypeWithKryoSerializer?
> Now to create the table environment I don't use the ExecutionEnvironment
> anymore..how can I register those serializers?
> For example I used to run
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
>
> Best,
> Flavio
>
> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu  wrote:
>
>> Hi Flavio,
>>
>> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
>> and the new connector interface (i.e. FLIP-95 [1]).
>> You may need to implement a `ScanTableSource` that uses
>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>
>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier 
>> wrote:
>>
>>> Ok..just one last thing: to use my TableSource I use the deprecated
>>> API registerTableSource:
>>>
>>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>>
>>> The javadoc says to use executeSql but this requires some extra steps
>>> (that are not mentioned in the documentation).
>>> Do I have to create a TableFactory, right? How do I register it? Is
>>> there an example somewhere?
>>>
>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu  wrote:
>>>
 I agree with you @Flavio Pompermaier  , the
 exception message definitely should be improved.
 We created a similar issue a long time before
 https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
 might be complicated.

 Best,
 Jark

 On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
 wrote:

> You're right Jark..sorry I didn't see the typo. The backticks are also
> mandatory.
> Maybe the exception message could be more meaningful and specify the
> token that caused the error instead of a general "SQL parse failed.
> Non-query expression encountered in illegal context".
>
> Thanks a lot for the support,
> Flavio
>
> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:
>
>> A typo of "INSERTO"? Try this?
>>
>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>> MySourceDataset");
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Now I'm able to run my code but there's something I don't
>>> understand: what is the difference between the following two?
>>>
>>>  //common code
>>>  final CsvTableSink outSink = new
>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>  tableEnv.registerTableSink("out", dsFields,
>>> myInputformat.getFieldTypes(), outSink);
>>>
>>>- tableEnv.from("MySourceDataset").executeInsert("out");
>>>  --> this works
>>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>MySourceDataset");   --> this does not work
>>>
>>> The second option fails with the following exception:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>> Non-query
>>> expression encountered in illegal context
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>
>>> Best,
>>> Flavio
>>>
>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he 
>>> wrote:
>>>
 hi Flavio,

 `BatchTableSource` can only be used for old planner.
 if you want to use Blink planner to run batch job,
 your table source should implement `StreamTableSource`
 and `isBounded` method return true.

 Best,
 Godfrey



 Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv)
> {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
> 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
And what about the env.registerTypeWithKryoSerializer?
Now to create the table environment I don't use the ExecutionEnvironment
anymore..how can I register those serializers?
For example I used to run
env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class);

Best,
Flavio

On Mon, Jul 13, 2020 at 3:16 PM Jark Wu  wrote:

> Hi Flavio,
>
> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
> and the new connector interface (i.e. FLIP-95 [1]).
> You may need to implement a `ScanTableSource` that uses
> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>
> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier 
> wrote:
>
>> Ok..just one last thing: to use my TableSource I use the deprecated
>> API registerTableSource:
>>
>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>
>> The javadoc says to use executeSql but this requires some extra steps
>> (that are not mentioned in the documentation).
>> Do I have to create a TableFactory, right? How do I register it? Is there
>> an example somewhere?
>>
>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu  wrote:
>>
>>> I agree with you @Flavio Pompermaier  , the
>>> exception message definitely should be improved.
>>> We created a similar issue a long time before
>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>> might be complicated.
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
>>> wrote:
>>>
 You're right Jark..sorry I didn't see the typo. The backticks are also
 mandatory.
 Maybe the exception message could be more meaningful and specify the
 token that caused the error instead of a general "SQL parse failed.
 Non-query expression encountered in illegal context".

 Thanks a lot for the support,
 Flavio

 On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:

> A typo of "INSERTO"? Try this?
>
> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
> wrote:
>
>> Now I'm able to run my code but there's something I don't understand:
>> what is the difference between the following two?
>>
>>  //common code
>>  final CsvTableSink outSink = new
>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>  tableEnv.registerTableSink("out", dsFields,
>> myInputformat.getFieldTypes(), outSink);
>>
>>- tableEnv.from("MySourceDataset").executeInsert("out");
>>--> this works
>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>MySourceDataset");   --> this does not work
>>
>> The second option fails with the following exception:
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>> Non-query
>> expression encountered in illegal context
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>> Best,
>> Flavio
>>
>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he 
>> wrote:
>>
>>> hi Flavio,
>>>
>>> `BatchTableSource` can only be used for old planner.
>>> if you want to use Blink planner to run batch job,
>>> your table source should implement `StreamTableSource`
>>> and `isBounded` method return true.
>>>
>>> Best,
>>> Godfrey
>>>
>>>
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>>>
 Is it correct to do something like this?

 TableSource myTableSource = new BatchTableSource() {
   @Override
   public TableSchema getTableSchema() {
 return new TableSchema(dsFields, ft);
   }
   @Override
   public DataSet getDataSet(ExecutionEnvironment execEnv) {
 return execEnv.createInput(myInputformat);
   }
 };

 On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> How can you reuse InputFormat to write a TableSource? I think that
> at least initially this could be the simplest way to test the
> migration..then I could try yo implement the new Table Source 
> interface
>
> On Fri, Jul 10, 2020 at 3:38 PM godfrey he 
> wrote:
>
>> hi Flavio,
>> Only old planner supports BatchTableEnvironment (which can
>> convert to/from DataSet),
>> while Blink planner 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
Hi Flavio,

tableEnv.registerTableSource is deprecated in order to migrate to use DDL
and the new connector interface (i.e. FLIP-95 [1]).
You may need to implement a `ScanTableSource` that uses
`InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier 
wrote:

> Ok..just one last thing: to use my TableSource I use the deprecated
> API registerTableSource:
>
> tableEnv.registerTableSource("MySourceDataset", tableSource);
>
> The javadoc says to use executeSql but this requires some extra steps
> (that are not mentioned in the documentation).
> Do I have to create a TableFactory, right? How do I register it? Is there
> an example somewhere?
>
> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu  wrote:
>
>> I agree with you @Flavio Pompermaier  , the
>> exception message definitely should be improved.
>> We created a similar issue a long time before
>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
>> be complicated.
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
>> wrote:
>>
>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>> mandatory.
>>> Maybe the exception message could be more meaningful and specify the
>>> token that caused the error instead of a general "SQL parse failed.
>>> Non-query expression encountered in illegal context".
>>>
>>> Thanks a lot for the support,
>>> Flavio
>>>
>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:
>>>
 A typo of "INSERTO"? Try this?

 tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");

 Best,
 Jark

 On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
 wrote:

> Now I'm able to run my code but there's something I don't understand:
> what is the difference between the following two?
>
>  //common code
>  final CsvTableSink outSink = new
> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>  tableEnv.registerTableSink("out", dsFields,
> myInputformat.getFieldTypes(), outSink);
>
>- tableEnv.from("MySourceDataset").executeInsert("out");
>--> this works
>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>MySourceDataset");   --> this does not work
>
> The second option fails with the following exception:
>
> Exception in thread "main"
> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
> expression encountered in illegal context
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>
> Best,
> Flavio
>
> On Sun, Jul 12, 2020 at 5:04 PM godfrey he 
> wrote:
>
>> hi Flavio,
>>
>> `BatchTableSource` can only be used for old planner.
>> if you want to use Blink planner to run batch job,
>> your table source should implement `StreamTableSource`
>> and `isBounded` method return true.
>>
>> Best,
>> Godfrey
>>
>>
>>
>> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>>
>>> Is it correct to do something like this?
>>>
>>> TableSource myTableSource = new BatchTableSource() {
>>>   @Override
>>>   public TableSchema getTableSchema() {
>>> return new TableSchema(dsFields, ft);
>>>   }
>>>   @Override
>>>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
>>> return execEnv.createInput(myInputformat);
>>>   }
>>> };
>>>
>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 How can you reuse InputFormat to write a TableSource? I think that
 at least initially this could be the simplest way to test the
 migration..then I could try yo implement the new Table Source interface

 On Fri, Jul 10, 2020 at 3:38 PM godfrey he 
 wrote:

> hi Flavio,
> Only old planner supports BatchTableEnvironment (which can convert
> to/from DataSet),
> while Blink planner in batch mode only support TableEnvironment.
> Because Blink planner
> convert the batch queries to Transformation (corresponding to
> DataStream), instead of DataSet.
>
> one approach is you can migrate them to TableSource instead
> (InputFormat can be reused),
> but TableSource will be deprecated later. you can try new table
> source[1]
>
> [1]
> 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Ok..just one last thing: to use my TableSource I use the deprecated
API registerTableSource:

tableEnv.registerTableSource("MySourceDataset", tableSource);

The javadoc says to use executeSql but this requires some extra steps (that
are not mentioned in the documentation).
Do I have to create a TableFactory, right? How do I register it? Is there
an example somewhere?

On Mon, Jul 13, 2020 at 2:28 PM Jark Wu  wrote:

> I agree with you @Flavio Pompermaier  , the
> exception message definitely should be improved.
> We created a similar issue a long time before
> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
> be complicated.
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
> wrote:
>
>> You're right Jark..sorry I didn't see the typo. The backticks are also
>> mandatory.
>> Maybe the exception message could be more meaningful and specify the
>> token that caused the error instead of a general "SQL parse failed.
>> Non-query expression encountered in illegal context".
>>
>> Thanks a lot for the support,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:
>>
>>> A typo of "INSERTO"? Try this?
>>>
>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
>>> wrote:
>>>
 Now I'm able to run my code but there's something I don't understand:
 what is the difference between the following two?

  //common code
  final CsvTableSink outSink = new
 CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
  tableEnv.registerTableSink("out", dsFields,
 myInputformat.getFieldTypes(), outSink);

- tableEnv.from("MySourceDataset").executeInsert("out");
  --> this works
- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
MySourceDataset");   --> this does not work

 The second option fails with the following exception:

 Exception in thread "main"
 org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
 expression encountered in illegal context
 at
 org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
 at
 org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
 at
 org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)

 Best,
 Flavio

 On Sun, Jul 12, 2020 at 5:04 PM godfrey he  wrote:

> hi Flavio,
>
> `BatchTableSource` can only be used for old planner.
> if you want to use Blink planner to run batch job,
> your table source should implement `StreamTableSource`
> and `isBounded` method return true.
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>
>> Is it correct to do something like this?
>>
>> TableSource myTableSource = new BatchTableSource() {
>>   @Override
>>   public TableSchema getTableSchema() {
>> return new TableSchema(dsFields, ft);
>>   }
>>   @Override
>>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
>> return execEnv.createInput(myInputformat);
>>   }
>> };
>>
>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> How can you reuse InputFormat to write a TableSource? I think that
>>> at least initially this could be the simplest way to test the
>>> migration..then I could try yo implement the new Table Source interface
>>>
>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he 
>>> wrote:
>>>
 hi Flavio,
 Only old planner supports BatchTableEnvironment (which can convert
 to/from DataSet),
 while Blink planner in batch mode only support TableEnvironment.
 Because Blink planner
 convert the batch queries to Transformation (corresponding to
 DataStream), instead of DataSet.

 one approach is you can migrate them to TableSource instead
 (InputFormat can be reused),
 but TableSource will be deprecated later. you can try new table
 source[1]

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

 Best,
 Godfrey

 Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code.
> The main problem is that I can't create a BatchTableEnv anymore so I 
> can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to
> TableSource instead?
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
I agree with you @Flavio Pompermaier  , the exception
message definitely should be improved.
We created a similar issue a long time before
https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might be
complicated.

Best,
Jark

On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier 
wrote:

> You're right Jark..sorry I didn't see the typo. The backticks are also
> mandatory.
> Maybe the exception message could be more meaningful and specify the token
> that caused the error instead of a general "SQL parse failed. Non-query
> expression encountered in illegal context".
>
> Thanks a lot for the support,
> Flavio
>
> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:
>
>> A typo of "INSERTO"? Try this?
>>
>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
>> wrote:
>>
>>> Now I'm able to run my code but there's something I don't understand:
>>> what is the difference between the following two?
>>>
>>>  //common code
>>>  final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>> "\t", 1, WriteMode.OVERWRITE);
>>>  tableEnv.registerTableSink("out", dsFields,
>>> myInputformat.getFieldTypes(), outSink);
>>>
>>>- tableEnv.from("MySourceDataset").executeInsert("out");
>>>  --> this works
>>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>MySourceDataset");   --> this does not work
>>>
>>> The second option fails with the following exception:
>>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>> expression encountered in illegal context
>>> at
>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>
>>> Best,
>>> Flavio
>>>
>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he  wrote:
>>>
 hi Flavio,

 `BatchTableSource` can only be used for old planner.
 if you want to use Blink planner to run batch job,
 your table source should implement `StreamTableSource`
 and `isBounded` method return true.

 Best,
 Godfrey



 Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the 
>> migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he 
>> wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment.
>>> Because Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead
>>> (InputFormat can be reused),
>>> but TableSource will be deprecated later. you can try new table
>>> source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>>
 Thanks but I still can't understand how to migrate my legacy code.
 The main problem is that I can't create a BatchTableEnv anymore so I 
 can't
 call createInput.

 Is there a way to reuse InputFormats? Should I migrate them to
 TableSource instead?

 public static void main(String[] args) throws Exception {
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment btEnv =
 TableEnvironment.getTableEnvironment(env);
 MyInputFormat myInputformat =  new MyInputFormat(dsFields,
 ft).finish();
 DataSet rows = env.createInput(myInputformat);
 Table table = btEnv.fromDataSet(rows, String.join(",",
 dsFields));
 CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
 "\t", 1, WriteMode.OVERWRITE);
 btEnv.registerTableSink("out", 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
You're right Jark..sorry I didn't see the typo. The backticks are also
mandatory.
Maybe the exception message could be more meaningful and specify the token
that caused the error instead of a general "SQL parse failed. Non-query
expression encountered in illegal context".

Thanks a lot for the support,
Flavio

On Mon, Jul 13, 2020 at 1:41 PM Jark Wu  wrote:

> A typo of "INSERTO"? Try this?
>
> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
> wrote:
>
>> Now I'm able to run my code but there's something I don't understand:
>> what is the difference between the following two?
>>
>>  //common code
>>  final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>> "\t", 1, WriteMode.OVERWRITE);
>>  tableEnv.registerTableSink("out", dsFields,
>> myInputformat.getFieldTypes(), outSink);
>>
>>- tableEnv.from("MySourceDataset").executeInsert("out");
>>--> this works
>>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>MySourceDataset");   --> this does not work
>>
>> The second option fails with the following exception:
>>
>> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
>> SQL parse failed. Non-query expression encountered in illegal context
>> at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>
>> Best,
>> Flavio
>>
>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he  wrote:
>>
>>> hi Flavio,
>>>
>>> `BatchTableSource` can only be used for old planner.
>>> if you want to use Blink planner to run batch job,
>>> your table source should implement `StreamTableSource`
>>> and `isBounded` method return true.
>>>
>>> Best,
>>> Godfrey
>>>
>>>
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>>>
 Is it correct to do something like this?

 TableSource myTableSource = new BatchTableSource() {
   @Override
   public TableSchema getTableSchema() {
 return new TableSchema(dsFields, ft);
   }
   @Override
   public DataSet getDataSet(ExecutionEnvironment execEnv) {
 return execEnv.createInput(myInputformat);
   }
 };

 On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> How can you reuse InputFormat to write a TableSource? I think that at
> least initially this could be the simplest way to test the migration..then
> I could try yo implement the new Table Source interface
>
> On Fri, Jul 10, 2020 at 3:38 PM godfrey he 
> wrote:
>
>> hi Flavio,
>> Only old planner supports BatchTableEnvironment (which can convert
>> to/from DataSet),
>> while Blink planner in batch mode only support TableEnvironment.
>> Because Blink planner
>> convert the batch queries to Transformation (corresponding to
>> DataStream), instead of DataSet.
>>
>> one approach is you can migrate them to TableSource instead
>> (InputFormat can be reused),
>> but TableSource will be deprecated later. you can try new table
>> source[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>
>>> Thanks but I still can't understand how to migrate my legacy code.
>>> The main problem is that I can't create a BatchTableEnv anymore so I 
>>> can't
>>> call createInput.
>>>
>>> Is there a way to reuse InputFormats? Should I migrate them to
>>> TableSource instead?
>>>
>>> public static void main(String[] args) throws Exception {
>>> ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>> ft).finish();
>>> DataSet rows = env.createInput(myInputformat);
>>> Table table = btEnv.fromDataSet(rows, String.join(",",
>>> dsFields));
>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>> "\t", 1, WriteMode.OVERWRITE);
>>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>>> env.execute();
>>>   }
>>>
>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
 You should be good with using the TableEnvironment. The
 StreamTableEnvironment is needed only if you want to convert to
 DataStream. We do not support converting batch Table programs to

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Jark Wu
A typo of "INSERTO"? Try this?

tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");

Best,
Jark

On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier 
wrote:

> Now I'm able to run my code but there's something I don't understand: what
> is the difference between the following two?
>
>  //common code
>  final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
> "\t", 1, WriteMode.OVERWRITE);
>  tableEnv.registerTableSink("out", dsFields,
> myInputformat.getFieldTypes(), outSink);
>
>- tableEnv.from("MySourceDataset").executeInsert("out");
>--> this works
>- tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>MySourceDataset");   --> this does not work
>
> The second option fails with the following exception:
>
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Non-query expression encountered in illegal context
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>
> Best,
> Flavio
>
> On Sun, Jul 12, 2020 at 5:04 PM godfrey he  wrote:
>
>> hi Flavio,
>>
>> `BatchTableSource` can only be used for old planner.
>> if you want to use Blink planner to run batch job,
>> your table source should implement `StreamTableSource`
>> and `isBounded` method return true.
>>
>> Best,
>> Godfrey
>>
>>
>>
>> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>>
>>> Is it correct to do something like this?
>>>
>>> TableSource myTableSource = new BatchTableSource() {
>>>   @Override
>>>   public TableSchema getTableSchema() {
>>> return new TableSchema(dsFields, ft);
>>>   }
>>>   @Override
>>>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
>>> return execEnv.createInput(myInputformat);
>>>   }
>>> };
>>>
>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
>>> wrote:
>>>
 How can you reuse InputFormat to write a TableSource? I think that at
 least initially this could be the simplest way to test the migration..then
 I could try yo implement the new Table Source interface

 On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:

> hi Flavio,
> Only old planner supports BatchTableEnvironment (which can convert
> to/from DataSet),
> while Blink planner in batch mode only support TableEnvironment.
> Because Blink planner
> convert the batch queries to Transformation (corresponding to
> DataStream), instead of DataSet.
>
> one approach is you can migrate them to TableSource instead
> (InputFormat can be reused),
> but TableSource will be deprecated later. you can try new table
> source[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>
> Best,
> Godfrey
>
> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>
>> Thanks but I still can't understand how to migrate my legacy code.
>> The main problem is that I can't create a BatchTableEnv anymore so I 
>> can't
>> call createInput.
>>
>> Is there a way to reuse InputFormats? Should I migrate them to
>> TableSource instead?
>>
>> public static void main(String[] args) throws Exception {
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>> ft).finish();
>> DataSet rows = env.createInput(myInputformat);
>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>> "\t", 1, WriteMode.OVERWRITE);
>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>> env.execute();
>>   }
>>
>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>> dwysakow...@apache.org> wrote:
>>
>>> You should be good with using the TableEnvironment. The
>>> StreamTableEnvironment is needed only if you want to convert to
>>> DataStream. We do not support converting batch Table programs to
>>> DataStream yet.
>>>
>>> A following code should work:
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>
>>> TableEnvironment.create(settings);
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>> > using a BatchTableEnv and now I've tried to use the following:
>>> >
>>> > 

Re: Table API jobs migration to Flink 1.11

2020-07-13 Thread Flavio Pompermaier
Now I'm able to run my code but there's something I don't understand: what
is the difference between the following two?

 //common code
 final CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
"\t", 1, WriteMode.OVERWRITE);
 tableEnv.registerTableSink("out", dsFields,
myInputformat.getFieldTypes(), outSink);

   - tableEnv.from("MySourceDataset").executeInsert("out");
 --> this works
   - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
   MySourceDataset");   --> this does not work

The second option fails with the following exception:

Exception in thread "main" org.apache.flink.table.api.SqlParserException:
SQL parse failed. Non-query expression encountered in illegal context
at
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)

Best,
Flavio

On Sun, Jul 12, 2020 at 5:04 PM godfrey he  wrote:

> hi Flavio,
>
> `BatchTableSource` can only be used for old planner.
> if you want to use Blink planner to run batch job,
> your table source should implement `StreamTableSource`
> and `isBounded` method return true.
>
> Best,
> Godfrey
>
>
>
> Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:
>
>> Is it correct to do something like this?
>>
>> TableSource myTableSource = new BatchTableSource() {
>>   @Override
>>   public TableSchema getTableSchema() {
>> return new TableSchema(dsFields, ft);
>>   }
>>   @Override
>>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
>> return execEnv.createInput(myInputformat);
>>   }
>> };
>>
>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
>> wrote:
>>
>>> How can you reuse InputFormat to write a TableSource? I think that at
>>> least initially this could be the simplest way to test the migration..then
>>> I could try yo implement the new Table Source interface
>>>
>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>>>
 hi Flavio,
 Only old planner supports BatchTableEnvironment (which can convert
 to/from DataSet),
 while Blink planner in batch mode only support TableEnvironment.
 Because Blink planner
 convert the batch queries to Transformation (corresponding to
 DataStream), instead of DataSet.

 one approach is you can migrate them to TableSource instead
 (InputFormat can be reused),
 but TableSource will be deprecated later. you can try new table
 source[1]

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

 Best,
 Godfrey

 Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code. The
> main problem is that I can't create a BatchTableEnv anymore so I can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to
> TableSource instead?
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
> ft).finish();
> DataSet rows = env.createInput(myInputformat);
> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
> "\t", 1, WriteMode.OVERWRITE);
> btEnv.registerTableSink("out", dsFields, ft, outSink);
> btEnv.insertInto(table, "out", btEnv.queryConfig());
> env.execute();
>   }
>
> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
> dwysakow...@apache.org> wrote:
>
>> You should be good with using the TableEnvironment. The
>> StreamTableEnvironment is needed only if you want to convert to
>> DataStream. We do not support converting batch Table programs to
>> DataStream yet.
>>
>> A following code should work:
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inBatchMode().build();
>>
>> TableEnvironment.create(settings);
>>
>> Best,
>>
>> Dawid
>>
>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>> > Hi to all,
>> > I was trying to update my legacy code to Flink 1.11. Before I was
>> > using a BatchTableEnv and now I've tried to use the following:
>> >
>> > EnvironmentSettings settings =
>> > EnvironmentSettings.newInstance().inBatchMode().build();
>> >
>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>> >
>> > if (!settings.isStreamingMode()) {
>> > throw new TableException(
>> > "StreamTableEnvironment can not run in batch mode for now, please

Re: Table API jobs migration to Flink 1.11

2020-07-12 Thread godfrey he
hi Flavio,

`BatchTableSource` can only be used for old planner.
if you want to use Blink planner to run batch job,
your table source should implement `StreamTableSource`
and `isBounded` method return true.

Best,
Godfrey



Flavio Pompermaier  于2020年7月10日周五 下午10:32写道:

> Is it correct to do something like this?
>
> TableSource myTableSource = new BatchTableSource() {
>   @Override
>   public TableSchema getTableSchema() {
> return new TableSchema(dsFields, ft);
>   }
>   @Override
>   public DataSet getDataSet(ExecutionEnvironment execEnv) {
> return execEnv.createInput(myInputformat);
>   }
> };
>
> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
> wrote:
>
>> How can you reuse InputFormat to write a TableSource? I think that at
>> least initially this could be the simplest way to test the migration..then
>> I could try yo implement the new Table Source interface
>>
>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>>
>>> hi Flavio,
>>> Only old planner supports BatchTableEnvironment (which can convert
>>> to/from DataSet),
>>> while Blink planner in batch mode only support TableEnvironment. Because
>>> Blink planner
>>> convert the batch queries to Transformation (corresponding to
>>> DataStream), instead of DataSet.
>>>
>>> one approach is you can migrate them to TableSource instead (InputFormat
>>> can be reused),
>>> but TableSource will be deprecated later. you can try new table source[1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>
>>> Best,
>>> Godfrey
>>>
>>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>>
 Thanks but I still can't understand how to migrate my legacy code. The
 main problem is that I can't create a BatchTableEnv anymore so I can't
 call createInput.

 Is there a way to reuse InputFormats? Should I migrate them to
 TableSource instead?

 public static void main(String[] args) throws Exception {
 ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment btEnv =
 TableEnvironment.getTableEnvironment(env);
 MyInputFormat myInputformat =  new MyInputFormat(dsFields,
 ft).finish();
 DataSet rows = env.createInput(myInputformat);
 Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
 CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
 1, WriteMode.OVERWRITE);
 btEnv.registerTableSink("out", dsFields, ft, outSink);
 btEnv.insertInto(table, "out", btEnv.queryConfig());
 env.execute();
   }

 On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> You should be good with using the TableEnvironment. The
> StreamTableEnvironment is needed only if you want to convert to
> DataStream. We do not support converting batch Table programs to
> DataStream yet.
>
> A following code should work:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> TableEnvironment.create(settings);
>
> Best,
>
> Dawid
>
> On 10/07/2020 11:48, Flavio Pompermaier wrote:
> > Hi to all,
> > I was trying to update my legacy code to Flink 1.11. Before I was
> > using a BatchTableEnv and now I've tried to use the following:
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >
> > Unfortunately in the StreamTableEnvironmentImpl code there's :
> >
> > if (!settings.isStreamingMode()) {
> > throw new TableException(
> > "StreamTableEnvironment can not run in batch mode for now, please use
> > TableEnvironment.");
> > }
> >
> > What should I do here?
> >
> > Thanks in advance,
> > Flavio
>
>



Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
Is it correct to do something like this?

TableSource myTableSource = new BatchTableSource() {
  @Override
  public TableSchema getTableSchema() {
return new TableSchema(dsFields, ft);
  }
  @Override
  public DataSet getDataSet(ExecutionEnvironment execEnv) {
return execEnv.createInput(myInputformat);
  }
};

On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier 
wrote:

> How can you reuse InputFormat to write a TableSource? I think that at
> least initially this could be the simplest way to test the migration..then
> I could try yo implement the new Table Source interface
>
> On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:
>
>> hi Flavio,
>> Only old planner supports BatchTableEnvironment (which can convert
>> to/from DataSet),
>> while Blink planner in batch mode only support TableEnvironment. Because
>> Blink planner
>> convert the batch queries to Transformation (corresponding to
>> DataStream), instead of DataSet.
>>
>> one approach is you can migrate them to TableSource instead (InputFormat
>> can be reused),
>> but TableSource will be deprecated later. you can try new table source[1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>>
>>> Thanks but I still can't understand how to migrate my legacy code. The
>>> main problem is that I can't create a BatchTableEnv anymore so I can't
>>> call createInput.
>>>
>>> Is there a way to reuse InputFormats? Should I migrate them to
>>> TableSource instead?
>>>
>>> public static void main(String[] args) throws Exception {
>>> ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>> ft).finish();
>>> DataSet rows = env.createInput(myInputformat);
>>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>>> 1, WriteMode.OVERWRITE);
>>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>>> env.execute();
>>>   }
>>>
>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
 You should be good with using the TableEnvironment. The
 StreamTableEnvironment is needed only if you want to convert to
 DataStream. We do not support converting batch Table programs to
 DataStream yet.

 A following code should work:

 EnvironmentSettings settings =
 EnvironmentSettings.newInstance().inBatchMode().build();

 TableEnvironment.create(settings);

 Best,

 Dawid

 On 10/07/2020 11:48, Flavio Pompermaier wrote:
 > Hi to all,
 > I was trying to update my legacy code to Flink 1.11. Before I was
 > using a BatchTableEnv and now I've tried to use the following:
 >
 > EnvironmentSettings settings =
 > EnvironmentSettings.newInstance().inBatchMode().build();
 >
 > Unfortunately in the StreamTableEnvironmentImpl code there's :
 >
 > if (!settings.isStreamingMode()) {
 > throw new TableException(
 > "StreamTableEnvironment can not run in batch mode for now, please use
 > TableEnvironment.");
 > }
 >
 > What should I do here?
 >
 > Thanks in advance,
 > Flavio


>>>


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
How can you reuse InputFormat to write a TableSource? I think that at least
initially this could be the simplest way to test the migration..then I
could try yo implement the new Table Source interface

On Fri, Jul 10, 2020 at 3:38 PM godfrey he  wrote:

> hi Flavio,
> Only old planner supports BatchTableEnvironment (which can convert to/from
> DataSet),
> while Blink planner in batch mode only support TableEnvironment. Because
> Blink planner
> convert the batch queries to Transformation (corresponding to DataStream),
> instead of DataSet.
>
> one approach is you can migrate them to TableSource instead (InputFormat
> can be reused),
> but TableSource will be deprecated later. you can try new table source[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>
> Best,
> Godfrey
>
> Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:
>
>> Thanks but I still can't understand how to migrate my legacy code. The
>> main problem is that I can't create a BatchTableEnv anymore so I can't
>> call createInput.
>>
>> Is there a way to reuse InputFormats? Should I migrate them to
>> TableSource instead?
>>
>> public static void main(String[] args) throws Exception {
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>> ft).finish();
>> DataSet rows = env.createInput(myInputformat);
>> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
>> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t",
>> 1, WriteMode.OVERWRITE);
>> btEnv.registerTableSink("out", dsFields, ft, outSink);
>> btEnv.insertInto(table, "out", btEnv.queryConfig());
>> env.execute();
>>   }
>>
>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz 
>> wrote:
>>
>>> You should be good with using the TableEnvironment. The
>>> StreamTableEnvironment is needed only if you want to convert to
>>> DataStream. We do not support converting batch Table programs to
>>> DataStream yet.
>>>
>>> A following code should work:
>>>
>>> EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>
>>> TableEnvironment.create(settings);
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > I was trying to update my legacy code to Flink 1.11. Before I was
>>> > using a BatchTableEnv and now I've tried to use the following:
>>> >
>>> > EnvironmentSettings settings =
>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>> >
>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>> >
>>> > if (!settings.isStreamingMode()) {
>>> > throw new TableException(
>>> > "StreamTableEnvironment can not run in batch mode for now, please use
>>> > TableEnvironment.");
>>> > }
>>> >
>>> > What should I do here?
>>> >
>>> > Thanks in advance,
>>> > Flavio
>>>
>>>
>>


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread godfrey he
hi Flavio,
Only old planner supports BatchTableEnvironment (which can convert to/from
DataSet),
while Blink planner in batch mode only support TableEnvironment. Because
Blink planner
convert the batch queries to Transformation (corresponding to DataStream),
instead of DataSet.

one approach is you can migrate them to TableSource instead (InputFormat
can be reused),
but TableSource will be deprecated later. you can try new table source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

Best,
Godfrey

Flavio Pompermaier  于2020年7月10日周五 下午8:54写道:

> Thanks but I still can't understand how to migrate my legacy code. The
> main problem is that I can't create a BatchTableEnv anymore so I can't
> call createInput.
>
> Is there a way to reuse InputFormats? Should I migrate them to TableSource
> instead?
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
> MyInputFormat myInputformat =  new MyInputFormat(dsFields,
> ft).finish();
> DataSet rows = env.createInput(myInputformat);
> Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
> CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
> WriteMode.OVERWRITE);
> btEnv.registerTableSink("out", dsFields, ft, outSink);
> btEnv.insertInto(table, "out", btEnv.queryConfig());
> env.execute();
>   }
>
> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz 
> wrote:
>
>> You should be good with using the TableEnvironment. The
>> StreamTableEnvironment is needed only if you want to convert to
>> DataStream. We do not support converting batch Table programs to
>> DataStream yet.
>>
>> A following code should work:
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inBatchMode().build();
>>
>> TableEnvironment.create(settings);
>>
>> Best,
>>
>> Dawid
>>
>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>> > Hi to all,
>> > I was trying to update my legacy code to Flink 1.11. Before I was
>> > using a BatchTableEnv and now I've tried to use the following:
>> >
>> > EnvironmentSettings settings =
>> > EnvironmentSettings.newInstance().inBatchMode().build();
>> >
>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>> >
>> > if (!settings.isStreamingMode()) {
>> > throw new TableException(
>> > "StreamTableEnvironment can not run in batch mode for now, please use
>> > TableEnvironment.");
>> > }
>> >
>> > What should I do here?
>> >
>> > Thanks in advance,
>> > Flavio
>>
>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
Thanks but I still can't understand how to migrate my legacy code. The main
problem is that I can't create a BatchTableEnv anymore so I can't
call createInput.

Is there a way to reuse InputFormats? Should I migrate them to TableSource
instead?

public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env);
MyInputFormat myInputformat =  new MyInputFormat(dsFields, ft).finish();
DataSet rows = env.createInput(myInputformat);
Table table = btEnv.fromDataSet(rows, String.join(",", dsFields));
CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1,
WriteMode.OVERWRITE);
btEnv.registerTableSink("out", dsFields, ft, outSink);
btEnv.insertInto(table, "out", btEnv.queryConfig());
env.execute();
  }

On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz 
wrote:

> You should be good with using the TableEnvironment. The
> StreamTableEnvironment is needed only if you want to convert to
> DataStream. We do not support converting batch Table programs to
> DataStream yet.
>
> A following code should work:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> TableEnvironment.create(settings);
>
> Best,
>
> Dawid
>
> On 10/07/2020 11:48, Flavio Pompermaier wrote:
> > Hi to all,
> > I was trying to update my legacy code to Flink 1.11. Before I was
> > using a BatchTableEnv and now I've tried to use the following:
> >
> > EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().inBatchMode().build();
> >
> > Unfortunately in the StreamTableEnvironmentImpl code there's :
> >
> > if (!settings.isStreamingMode()) {
> > throw new TableException(
> > "StreamTableEnvironment can not run in batch mode for now, please use
> > TableEnvironment.");
> > }
> >
> > What should I do here?
> >
> > Thanks in advance,
> > Flavio
>
>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Table API jobs migration to Flink 1.11

2020-07-10 Thread Dawid Wysakowicz
You should be good with using the TableEnvironment. The
StreamTableEnvironment is needed only if you want to convert to
DataStream. We do not support converting batch Table programs to
DataStream yet.

A following code should work:

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment.create(settings);

Best,

Dawid

On 10/07/2020 11:48, Flavio Pompermaier wrote:
> Hi to all,
> I was trying to update my legacy code to Flink 1.11. Before I was
> using a BatchTableEnv and now I've tried to use the following:
>
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inBatchMode().build();
>
> Unfortunately in the StreamTableEnvironmentImpl code there's :
>
> if (!settings.isStreamingMode()) {
>     throw new TableException(
> "StreamTableEnvironment can not run in batch mode for now, please use
> TableEnvironment.");
> }
>
> What should I do here?
>
> Thanks in advance,
> Flavio



signature.asc
Description: OpenPGP digital signature


Table API jobs migration to Flink 1.11

2020-07-10 Thread Flavio Pompermaier
Hi to all,
I was trying to update my legacy code to Flink 1.11. Before I was using a
BatchTableEnv and now I've tried to use the following:

EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();

Unfortunately in the StreamTableEnvironmentImpl code there's :

if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch mode for now, please use
TableEnvironment.");
}

What should I do here?

Thanks in advance,
Flavio