I think so. Maybe Fabian or Timo can correct me if I'm wrong here.

On Mon, Jun 25, 2018 at 9:17 AM James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> Hi Till:
>
>
>
> Thanks for your answer, so if I just add new sql and not modified old sql
> then use `/`--allowNonRestoredState option to restart job can resume old
> sql state from savepoints?
>
>
>
> Regards
>
>
>
> James
>
>
>
> *From: *Till Rohrmann <trohrm...@apache.org>
> *Date: *Friday, June 15, 2018 at 8:13 PM
> *To: *"James (Jian Wu) [FDS Data Platform]" <james...@coupang.com>
> *Cc: *user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>,
> Timo Walther <twal...@apache.org>
> *Subject: *Re: Restore state from save point with add new flink sql
>
>
>
> Hi James,
>
>
>
> as long as you do not change anything for `sql1`, it should work to
> recover from a savepoint if you pass the `-n`/`--allowNonRestoredState`
> option to the CLI when resuming your program from the savepoint. The reason
> is that an operators generated uid depends on the operator and on its
> inputs.
>
>
>
> I've also pulled in Fabian and Timo who will be able to tell you a little
> bit more about the job modification support for stream SQL.
>
>
>
> Cheers,
> Till
>
>
>
> On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
> james...@coupang.com> wrote:
>
> *Hi:*
>
>
>
> *   My application use flink sql, I want to add new sql to the
> application, *
>
>
>
> *For example first version is*
>
>
>
> DataStream<AggregatedOrderItems> paymentCompleteStream = *getKafkaStream*(env,
> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
>         .flatMap(new
> PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>         .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>
> tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream,
> *concatFieldsName*(AggregatedOrderItems.class, true, "eventTs"));
>
> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>
> Table resultTable = tableEnv.sqlQuery(*sql1*);
> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>         .flatMap(new
> E5FlatmapFunction(resultSampleRate)).setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *Then second version, I add new sql*
>
>
>
> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>         .flatMap(new
> A2FlatmapFunction(resultSampleRate)).setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                 return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *After restart job with savepoints, whether the original flink sql can be
> restore success? Whether the flink will assign a new UID to original sql
> operator? (I will not change the original sql)*
>
>
>
> *Regards*
>
>
>
> *James*
>
>
>
>

Reply via email to