Thanks, that was definitely helpful!

On Mon, Jul 13, 2020 at 4:39 PM Jark Wu <imj...@gmail.com> wrote:

> You can set string-based configuration on
> `tEnv.getConfig.getConfiguration.setString(..)` to replace them.
> Maybe you can try pipeline.default-kryo-serializers [1].
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers
>
> On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> And what about the env.registerTypeWithKryoSerializer?
>> Now to create the table environment I don't use the ExecutionEnvironment
>> anymore..how can I register those serializers?
>> For example I used to run
>> env.registerTypeWithKryoSerializer(DateTime.class,
>> JodaDateTimeSerializer.class);
>>
>> Best,
>> Flavio
>>
>> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> tableEnv.registerTableSource is deprecated in order to migrate to use
>>> DDL and the new connector interface (i.e. FLIP-95 [1]).
>>> You may need to implement a `ScanTableSource` that uses
>>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>>
>>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Ok..just one last thing: to use my TableSource I use the deprecated
>>>> API registerTableSource:
>>>>
>>>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>>>
>>>> The javadoc says to use executeSql but this requires some extra steps
>>>> (that are not mentioned in the documentation).
>>>> Do I have to create a TableFactory, right? How do I register it? Is
>>>> there an example somewhere?
>>>>
>>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote:
>>>>
>>>>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the
>>>>> exception message definitely should be improved.
>>>>> We created a similar issue a long time before
>>>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>>>> might be complicated.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it>
>>>>> wrote:
>>>>>
>>>>>> You're right Jark..sorry I didn't see the typo. The backticks are
>>>>>> also mandatory.
>>>>>> Maybe the exception message could be more meaningful and specify the
>>>>>> token that caused the error instead of a general "SQL parse failed.
>>>>>> Non-query expression encountered in illegal context".
>>>>>>
>>>>>> Thanks a lot for the support,
>>>>>> Flavio
>>>>>>
>>>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>
>>>>>>> A typo of "INSERTO"? Try this?
>>>>>>>
>>>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>>>> MySourceDataset");
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Now I'm able to run my code but there's something I don't
>>>>>>>> understand: what is the difference between the following two?
>>>>>>>>
>>>>>>>>      //common code
>>>>>>>>      final CsvTableSink outSink = new
>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>>>
>>>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>>>                                      --> this works
>>>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>>>    MySourceDataset");   --> this does not work
>>>>>>>>
>>>>>>>> The second option fails with the following exception:
>>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>>>>>>> Non-query
>>>>>>>> expression encountered in illegal context
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> hi Flavio,
>>>>>>>>>
>>>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>>>> your table source should implement `StreamTableSource`
>>>>>>>>> and `isBounded` method return true.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>>>
>>>>>>>>>> Is it correct to do something like this?
>>>>>>>>>>
>>>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>>>       @Override
>>>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>>>       }
>>>>>>>>>>       @Override
>>>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment
>>>>>>>>>> execEnv) {
>>>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>>>       }
>>>>>>>>>>     };
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think
>>>>>>>>>>> that at least initially this could be the simplest way to test the
>>>>>>>>>>> migration..then I could try yo implement the new Table Source 
>>>>>>>>>>> interface
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> hi Flavio,
>>>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>>>> while Blink planner in batch mode only
>>>>>>>>>>>> support TableEnvironment. Because Blink planner
>>>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>>>
>>>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>>>> source[1]
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>
>>>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五
>>>>>>>>>>>> 下午8:54写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv 
>>>>>>>>>>>>> anymore so I
>>>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>>>> ft).finish();
>>>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>>>> dsFields));
>>>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>>>     env.execute();
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>>>> dwysakow...@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> DataStream. We do not support converting batch Table programs
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before
>>>>>>>>>>>>>> I was
>>>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the
>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code
>>>>>>>>>>>>>> there's :
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>>>> please use
>>>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>>>> > }
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>
>>>>
>>

Reply via email to