You can set string-based configuration on
`tEnv.getConfig.getConfiguration.setString(..)` to replace them.
Maybe you can try pipeline.default-kryo-serializers [1].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-default-kryo-serializers

On Mon, 13 Jul 2020 at 21:57, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> And what about the env.registerTypeWithKryoSerializer?
> Now to create the table environment I don't use the ExecutionEnvironment
> anymore..how can I register those serializers?
> For example I used to run
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class);
>
> Best,
> Flavio
>
> On Mon, Jul 13, 2020 at 3:16 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> tableEnv.registerTableSource is deprecated in order to migrate to use DDL
>> and the new connector interface (i.e. FLIP-95 [1]).
>> You may need to implement a `ScanTableSource` that uses
>> `InputFormatProvider` as the `ScanTableSource#getScanRuntimeProvider`.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
>>
>> On Mon, 13 Jul 2020 at 20:39, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Ok..just one last thing: to use my TableSource I use the deprecated
>>> API registerTableSource:
>>>
>>> tableEnv.registerTableSource("MySourceDataset", tableSource);
>>>
>>> The javadoc says to use executeSql but this requires some extra steps
>>> (that are not mentioned in the documentation).
>>> Do I have to create a TableFactory, right? How do I register it? Is
>>> there an example somewhere?
>>>
>>> On Mon, Jul 13, 2020 at 2:28 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> I agree with you @Flavio Pompermaier <pomperma...@okkam.it> , the
>>>> exception message definitely should be improved.
>>>> We created a similar issue a long time before
>>>> https://issues.apache.org/jira/browse/CALCITE-3038, but the fixing
>>>> might be complicated.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Mon, 13 Jul 2020 at 19:59, Flavio Pompermaier <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>>> You're right Jark..sorry I didn't see the typo. The backticks are also
>>>>> mandatory.
>>>>> Maybe the exception message could be more meaningful and specify the
>>>>> token that caused the error instead of a general "SQL parse failed.
>>>>> Non-query expression encountered in illegal context".
>>>>>
>>>>> Thanks a lot for the support,
>>>>> Flavio
>>>>>
>>>>> On Mon, Jul 13, 2020 at 1:41 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>
>>>>>> A typo of "INSERTO"? Try this?
>>>>>>
>>>>>> tableEnv.executeSql("INSERT INTO `out` SELECT * FROM
>>>>>> MySourceDataset");
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Mon, 13 Jul 2020 at 18:25, Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Now I'm able to run my code but there's something I don't
>>>>>>> understand: what is the difference between the following two?
>>>>>>>
>>>>>>>      //common code
>>>>>>>      final CsvTableSink outSink = new
>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>      tableEnv.registerTableSink("out", dsFields,
>>>>>>> myInputformat.getFieldTypes(), outSink);
>>>>>>>
>>>>>>>    - tableEnv.from("MySourceDataset").executeInsert("out");
>>>>>>>                                      --> this works
>>>>>>>    - tableEnv.executeSql("INSERTO INTO out SELECT * FROM
>>>>>>>    MySourceDataset");   --> this does not work
>>>>>>>
>>>>>>> The second option fails with the following exception:
>>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>>>>>>> Non-query
>>>>>>> expression encountered in illegal context
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
>>>>>>> at
>>>>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>> On Sun, Jul 12, 2020 at 5:04 PM godfrey he <godfre...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hi Flavio,
>>>>>>>>
>>>>>>>> `BatchTableSource` can only be used for old planner.
>>>>>>>> if you want to use Blink planner to run batch job,
>>>>>>>> your table source should implement `StreamTableSource`
>>>>>>>> and `isBounded` method return true.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Godfrey
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五 下午10:32写道:
>>>>>>>>
>>>>>>>>> Is it correct to do something like this?
>>>>>>>>>
>>>>>>>>> TableSource<Row> myTableSource = new BatchTableSource<Row>() {
>>>>>>>>>       @Override
>>>>>>>>>       public TableSchema getTableSchema() {
>>>>>>>>>         return new TableSchema(dsFields, ft);
>>>>>>>>>       }
>>>>>>>>>       @Override
>>>>>>>>>       public DataSet<Row> getDataSet(ExecutionEnvironment execEnv)
>>>>>>>>> {
>>>>>>>>>         return execEnv.createInput(myInputformat);
>>>>>>>>>       }
>>>>>>>>>     };
>>>>>>>>>
>>>>>>>>> On Fri, Jul 10, 2020 at 3:54 PM Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> How can you reuse InputFormat to write a TableSource? I think
>>>>>>>>>> that at least initially this could be the simplest way to test the
>>>>>>>>>> migration..then I could try yo implement the new Table Source 
>>>>>>>>>> interface
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 10, 2020 at 3:38 PM godfrey he <godfre...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> hi Flavio,
>>>>>>>>>>> Only old planner supports BatchTableEnvironment (which can
>>>>>>>>>>> convert to/from DataSet),
>>>>>>>>>>> while Blink planner in batch mode only support TableEnvironment.
>>>>>>>>>>> Because Blink planner
>>>>>>>>>>> convert the batch queries to Transformation (corresponding to
>>>>>>>>>>> DataStream), instead of DataSet.
>>>>>>>>>>>
>>>>>>>>>>> one approach is you can migrate them to TableSource instead
>>>>>>>>>>> (InputFormat can be reused),
>>>>>>>>>>> but TableSource will be deprecated later. you can try new table
>>>>>>>>>>> source[1]
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Godfrey
>>>>>>>>>>>
>>>>>>>>>>> Flavio Pompermaier <pomperma...@okkam.it> 于2020年7月10日周五
>>>>>>>>>>> 下午8:54写道:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks but I still can't understand how to migrate my legacy
>>>>>>>>>>>> code. The main problem is that I can't create a BatchTableEnv 
>>>>>>>>>>>> anymore so I
>>>>>>>>>>>> can't call createInput.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a way to reuse InputFormats? Should I migrate them to
>>>>>>>>>>>> TableSource instead?
>>>>>>>>>>>>
>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>     ExecutionEnvironment env =
>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>     BatchTableEnvironment btEnv =
>>>>>>>>>>>> TableEnvironment.getTableEnvironment(env);
>>>>>>>>>>>>     MyInputFormat myInputformat =  new MyInputFormat(dsFields,
>>>>>>>>>>>> ft).finish();
>>>>>>>>>>>>     DataSet<Row> rows = env.createInput(myInputformat);
>>>>>>>>>>>>     Table table = btEnv.fromDataSet(rows, String.join(",",
>>>>>>>>>>>> dsFields));
>>>>>>>>>>>>     CsvTableSink outSink = new
>>>>>>>>>>>> CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE);
>>>>>>>>>>>>     btEnv.registerTableSink("out", dsFields, ft, outSink);
>>>>>>>>>>>>     btEnv.insertInto(table, "out", btEnv.queryConfig());
>>>>>>>>>>>>     env.execute();
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <
>>>>>>>>>>>> dwysakow...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You should be good with using the TableEnvironment. The
>>>>>>>>>>>>> StreamTableEnvironment is needed only if you want to convert to
>>>>>>>>>>>>> DataStream. We do not support converting batch Table programs
>>>>>>>>>>>>> to
>>>>>>>>>>>>> DataStream yet.
>>>>>>>>>>>>>
>>>>>>>>>>>>> A following code should work:
>>>>>>>>>>>>>
>>>>>>>>>>>>> EnvironmentSettings settings =
>>>>>>>>>>>>> EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>>
>>>>>>>>>>>>> TableEnvironment.create(settings);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 10/07/2020 11:48, Flavio Pompermaier wrote:
>>>>>>>>>>>>> > Hi to all,
>>>>>>>>>>>>> > I was trying to update my legacy code to Flink 1.11. Before
>>>>>>>>>>>>> I was
>>>>>>>>>>>>> > using a BatchTableEnv and now I've tried to use the
>>>>>>>>>>>>> following:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > EnvironmentSettings settings =
>>>>>>>>>>>>> > EnvironmentSettings.newInstance().inBatchMode().build();
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Unfortunately in the StreamTableEnvironmentImpl code there's
>>>>>>>>>>>>> :
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > if (!settings.isStreamingMode()) {
>>>>>>>>>>>>> >     throw new TableException(
>>>>>>>>>>>>> > "StreamTableEnvironment can not run in batch mode for now,
>>>>>>>>>>>>> please use
>>>>>>>>>>>>> > TableEnvironment.");
>>>>>>>>>>>>> > }
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > What should I do here?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thanks in advance,
>>>>>>>>>>>>> > Flavio
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>
>>>
>

Reply via email to