I think so. Maybe Fabian or Timo can correct me if I'm wrong here. On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] < james...@coupang.com> wrote:
> Hi Till: > > > > Thanks for your answer, so if I just add new sql and not modified old sql > then use `/`--allowNonRestoredState option to restart job can resume old > sql state from savepoints? > > > > Regards > > > > James > > > > *From: *Till Rohrmann <trohrm...@apache.org> > *Date: *Friday, June 15, 2018 at 8:13 PM > *To: *"James (Jian Wu) [FDS Data Platform]" <james...@coupang.com> > *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>, > Timo Walther <twal...@apache.org> > *Subject: *Re: Restore state from save point with add new flink sql > > > > Hi James, > > > > as long as you do not change anything for `sql1`, it should work to > recover from a savepoint if you pass the `-n`/`--allowNonRestoredState` > option to the CLI when resuming your program from the savepoint. The reason > is that an operators generated uid depends on the operator and on its > inputs. > > > > I've also pulled in Fabian and Timo who will be able to tell you a little > bit more about the job modification support for stream SQL. > > > > Cheers, > Till > > > > On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] < > james...@coupang.com> wrote: > > *Hi:* > > > > * My application use flink sql, I want to add new sql to the > application, * > > > > *For example first version is* > > > > DataStream<AggregatedOrderItems> paymentCompleteStream = *getKafkaStream*(env, > bootStrapServers, kafkaGroup, orderPaymentCompleteTopic) > .flatMap(new > PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30) > .returns(TypeInformation.*of*(AggregatedOrderItems.class)); > > tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, > *concatFieldsName*(AggregatedOrderItems.class, true, "eventTs")); > > tableEnv.registerFunction("group_concat", new GroupConcatFunction()); > > Table resultTable = tableEnv.sqlQuery(*sql1*); > tableEnv.toAppendStream(resultTable, Row.class, qConfig) > .flatMap(new > E5FlatmapFunction(resultSampleRate)).setParallelism(30) > .filter(new FilterFunction<DetectionResult>() { > @Override > public boolean filter(DetectionResult value) throws Exception { > return (value.getViolationCount() >= 5); > } > }).addSink(new DetectionResultMySqlSink()); > > > > *Then second version, I add new sql* > > > > Table resultTable2 = tableEnv.sqlQuery(*sql2*); > tableEnv.toAppendStream(resultTable2, Row.class, qConfig) > .flatMap(new > A2FlatmapFunction(resultSampleRate)).setParallelism(30) > .filter(new FilterFunction<DetectionResult>() { > @Override > public boolean filter(DetectionResult value) throws Exception { > return (value.getViolationCount() >= 5); > } > }).addSink(new DetectionResultMySqlSink()); > > > > *After restart job with savepoints, whether the original flink sql can be > restore success? Whether the flink will assign a new UID to original sql > operator? (I will not change the original sql)* > > > > *Regards* > > > > *James* > > > >