Hi James,

as long as you do not change anything for `sql1`, it should work to recover
from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to
the CLI when resuming your program from the savepoint. The reason is that
an operators generated uid depends on the operator and on its inputs.

I've also pulled in Fabian and Timo who will be able to tell you a little
bit more about the job modification support for stream SQL.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] <
james...@coupang.com> wrote:

> *Hi:*
>
>
>
> *   My application use flink sql, I want to add new sql to the
> application, *
>
>
>
> *For example first version is*
>
>
>
> DataStream<AggregatedOrderItems> paymentCompleteStream = *getKafkaStream*(env,
> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
>         .flatMap(new
> PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>         .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>
> tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream,
> *concatFieldsName*(AggregatedOrderItems.class, true, "eventTs"));
>
> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>
> Table resultTable = tableEnv.sqlQuery(*sql1*);
> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>         .flatMap(new
> E5FlatmapFunction(resultSampleRate)).setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *Then second version, I add new sql*
>
>
>
> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>         .flatMap(new
> A2FlatmapFunction(resultSampleRate)).setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                 return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *After restart job with savepoints, whether the original flink sql can be
> restore success? Whether the flink will assign a new UID to original sql
> operator? (I will not change the original sql)*
>
>
>
> *Regards*
>
>
>
> *James*
>
>
>

Reply via email to