Thanks but I still can't understand how to migrate my legacy code. The main problem is that I can't create a BatchTableEnv anymore so I can't call createInput.
Is there a way to reuse InputFormats? Should I migrate them to TableSource instead? public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env); MyInputFormat myInputformat = new MyInputFormat(dsFields, ft).finish(); DataSet<Row> rows = env.createInput(myInputformat); Table table = btEnv.fromDataSet(rows, String.join(",", dsFields)); CsvTableSink outSink = new CsvTableSink("file:/tmp/test.tsv", "\t", 1, WriteMode.OVERWRITE); btEnv.registerTableSink("out", dsFields, ft, outSink); btEnv.insertInto(table, "out", btEnv.queryConfig()); env.execute(); } On Fri, Jul 10, 2020 at 11:56 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > You should be good with using the TableEnvironment. The > StreamTableEnvironment is needed only if you want to convert to > DataStream. We do not support converting batch Table programs to > DataStream yet. > > A following code should work: > > EnvironmentSettings settings = > EnvironmentSettings.newInstance().inBatchMode().build(); > > TableEnvironment.create(settings); > > Best, > > Dawid > > On 10/07/2020 11:48, Flavio Pompermaier wrote: > > Hi to all, > > I was trying to update my legacy code to Flink 1.11. Before I was > > using a BatchTableEnv and now I've tried to use the following: > > > > EnvironmentSettings settings = > > EnvironmentSettings.newInstance().inBatchMode().build(); > > > > Unfortunately in the StreamTableEnvironmentImpl code there's : > > > > if (!settings.isStreamingMode()) { > > throw new TableException( > > "StreamTableEnvironment can not run in batch mode for now, please use > > TableEnvironment."); > > } > > > > What should I do here? > > > > Thanks in advance, > > Flavio > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809