Hi Flavio,

tableEnv.registerTableSource is deprecated in order to migrate to use DDL
and the new connector interface (i.e. FLIP-95 [1]).
You may need to implement a `ScanTableSource` that uses
`InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Ok..just one last thing: to use my TableSource I use the deprecated
> API registerTableSource:
>
> tableEnv.registerTableSource("MySourceDataset", tableSource);
>
> The javadoc says to use executeSql but this requires some extra steps
> (that are not mentioned in the documentation).
> Do I have to create a TableFactory, right? How do I register it? Is there
> an example somewhere?
>
> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote:
>
>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the
>> exception message definitely should be improved.
>> We created a similar issue a long time before
>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
>> be complicated.
>>
>> Best,
>> Jark
>>
>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>> mandatory.
>>> Maybe the exception message could be more meaningful and specify the
>>> token that caused the error instead of a general "SQL parse failed.
>>> Non-query expression encountered in illegal context".
>>>
>>> Thanks a lot for the support,
>>> Flavio
>>>
>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> A typo of "INSERTO"? Try this?
>>>>
>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>>> Now I'm able to run my code but there's something I don't understand:
>>>>> what is the difference between the following two?
>>>>>
>>>>>      //common code
>>>>>      final CsvTableSink outSink = new
>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>
>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>                                    --> this works
>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>    MySourceDataset");   --> this does not work
>>>>>
>>>>> The second option fails with the following exception:
>>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>>> expression encountered in illegal context
>>>>> at
>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>> at
>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>> at
>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi Flavio,
>>>>>>
>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>> if you want to use Blink planner to run batch job,
>>>>>> your table source should implement `StreamTableSource`
>>>>>> and `isBounded` method return true.
>>>>>>
>>>>>> Best,
>>>>>> Godfrey
>>>>>>
>>>>>>
>>>>>>
>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>
>>>>>>> Is it correct to do something like this?
>>>>>>>
>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>       @Override
>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>       }
>>>>>>>       @Override
>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>       }
>>>>>>>     };
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>>> at least initially this could be the simplest way to test the
>>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>>
>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> hi Flavio,
>>>>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>>>>> to/from DataSet),
>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>> Because Blink planner
>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>
>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>> (InputFormat can be reused),
>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>> source[1]
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>>
>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv 
>>>>>>>>>> anymore so I
>>>>>>>>>> can't call createInput.
>>>>>>>>>>
>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>> TableSource instead?
>>>>>>>>>>
>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>> ft).finish();
>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>> dsFields));
>>>>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>     env.execute();
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>> dwysakow...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>
>>>>>>>>>>> A following code should work:
>>>>>>>>>>>
>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>
>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Dawid
>>>>>>>>>>>
>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>> > Hi to all,
>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>>> was
>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>>> >
>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>> >
>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>>> >
>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>> please use
>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>> > }
>>>>>>>>>>> >
>>>>>>>>>>> > What should I do here?
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>> > Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>
>

Reply via email to