Ok..just one last thing: to use my TableSource I use the deprecated
API registerTableSource:

tableEnv.registerTableSource("MySourceDataset", tableSource);

The javadoc says to use executeSql but this requires some extra steps (that
are not mentioned in the documentation).
Do I have to create a TableFactory, right? How do I register it? Is there
an example somewhere?

On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote:

> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the
> exception message definitely should be improved.
> We created a similar issue a long time before
> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing might
> be complicated.
>
> Best,
> Jark
>
> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> You're right Jark..sorry I didn't see the typo. The backticks are also
>> mandatory.
>> Maybe the exception message could be more meaningful and specify the
>> token that caused the error instead of a general "SQL parse failed.
>> Non-query expression encountered in illegal context".
>>
>> Thanks a lot for the support,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote:
>>
>>> A typo of "INSERTO"? Try this?
>>>
>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM MySourceDataset");
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Now I'm able to run my code but there's something I don't understand:
>>>> what is the difference between the following two?
>>>>
>>>>      //common code
>>>>      final CsvTableSink outSink = new
>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>      tableEnv.registerTableSink("out", dsFields,
>>>> myInputformat.getFieldTypes(), outSink);
>>>>
>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>                                  --> this works
>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>    MySourceDataset");   --> this does not work
>>>>
>>>> The second option fails with the following exception:
>>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. Non-query
>>>> expression encountered in illegal context
>>>> at
>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com> wrote:
>>>>
>>>>> hi Flavio,
>>>>>
>>>>> `BatchTableSource` can only be used for old planner.
>>>>> if you want to use Blink planner to run batch job,
>>>>> your table source should implement `StreamTableSource`
>>>>> and `isBounded` method return true.
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>>
>>>>>
>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>
>>>>>> Is it correct to do something like this?
>>>>>>
>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>       @Override
>>>>>>       public TableSchema getTableSchema() {
>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>       }
>>>>>>       @Override
>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>       }
>>>>>>     };
>>>>>>
>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> How can you reuse InputFormat to write a TableSource? I think that
>>>>>>> at least initially this could be the simplest way to test the
>>>>>>> migration..then I could try yo implement the new Table Source interface
>>>>>>>
>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hi Flavio,
>>>>>>>> Only old planner supports BatchTableEnvironment (which can convert
>>>>>>>> to/from DataSet),
>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>> Because Blink planner
>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>
>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>> (InputFormat can be reused),
>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>> source[1]
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Godfrey
>>>>>>>>
>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午8:54写道:
>>>>>>>>
>>>>>>>>> Thanks but I still can't understand how to migrate my legacy code.
>>>>>>>>> The main problem is that I can't create a BatchTableEnv anymore so I 
>>>>>>>>> can't
>>>>>>>>> call createInput.
>>>>>>>>>
>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>> TableSource instead?
>>>>>>>>>
>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>> ft).finish();
>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>> dsFields));
>>>>>>>>>     CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv",
>>>>>>>>> "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>     env.execute();
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>> dwysakow...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>> DataStream. We do not support converting batch Table programs to
>>>>>>>>>> DataStream yet.
>>>>>>>>>>
>>>>>>>>>> A following code should work:
>>>>>>>>>>
>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>
>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Dawid
>>>>>>>>>>
>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>> > Hi to all,
>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before I
>>>>>>>>>> was
>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the following:
>>>>>>>>>> >
>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>> >
>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's :
>>>>>>>>>> >
>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>> >     throw new TableException(
>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>> please use
>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>> > }
>>>>>>>>>> >
>>>>>>>>>> > What should I do here?
>>>>>>>>>> >
>>>>>>>>>> > Thanks in advance,
>>>>>>>>>> > Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>

Reply via email to