Hi Till: Thanks for your answer, so if I just add new sql and not modified old sql then use `/`--allowNonRestoredState option to restart job can resume old sql state from savepoints?
Regards James From: Till Rohrmann <trohrm...@apache.org> Date: Friday, June 15, 2018 at 8:13 PM To: "James (Jian Wu) [FDS Data Platform]" <james...@coupang.com> Cc: user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>, Timo Walther <twal...@apache.org> Subject: Re: Restore state from save point with add new flink sql Hi James, as long as you do not change anything for `sql1`, it should work to recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the CLI when resuming your program from the savepoint. The reason is that an operators generated uid depends on the operator and on its inputs. I've also pulled in Fabian and Timo who will be able to tell you a little bit more about the job modification support for stream SQL. Cheers, Till On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <james...@coupang.com<mailto:james...@coupang.com>> wrote: Hi: My application use flink sql, I want to add new sql to the application, For example first version is DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) .returns(TypeInformation.of(AggregatedOrderItems.class)); tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, concatFieldsName(AggregatedOrderItems.class, true, "eventTs")); tableEnv.registerFunction("group_concat", new GroupConcatFunction()); Table resultTable = tableEnv.sqlQuery(sql1); tableEnv.toAppendStream(resultTable, Row.class, qConfig) .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30) .filter(new FilterFunction<DetectionResult>() { @Override public boolean filter(DetectionResult value) throws Exception { return (value.getViolationCount() >= 5); } }).addSink(new DetectionResultMySqlSink()); Then second version, I add new sql Table resultTable2 = tableEnv.sqlQuery(sql2); tableEnv.toAppendStream(resultTable2, Row.class, qConfig) .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30) .filter(new FilterFunction<DetectionResult>() { @Override public boolean filter(DetectionResult value) throws Exception { return (value.getViolationCount() >= 5); } }).addSink(new DetectionResultMySqlSink()); After restart job with savepoints, whether the original flink sql can be restore success? Whether the flink will assign a new UID to original sql operator? (I will not change the original sql) Regards James