Hi Till:

Thanks for your answer, so if I just add new sql and not modified old sql then 
use `/`--allowNonRestoredState option to restart job can resume old sql state 
from savepoints?

Regards

James

From: Till Rohrmann <trohrm...@apache.org>
Date: Friday, June 15, 2018 at 8:13 PM
To: "James (Jian Wu) [FDS Data Platform]" <james...@coupang.com>
Cc: user <user@flink.apache.org>, Fabian Hueske <fhue...@apache.org>, Timo 
Walther <twal...@apache.org>
Subject: Re: Restore state from save point with add new flink sql

Hi James,

as long as you do not change anything for `sql1`, it should work to recover 
from a savepoint if you pass the `-n`/`--allowNonRestoredState` option to the 
CLI when resuming your program from the savepoint. The reason is that an 
operators generated uid depends on the operator and on its inputs.

I've also pulled in Fabian and Timo who will be able to tell you a little bit 
more about the job modification support for stream SQL.

Cheers,
Till

On Fri, Jun 15, 2018 at 9:59 AM James (Jian Wu) [FDS Data Platform] 
<james...@coupang.com<mailto:james...@coupang.com>> wrote:
Hi:

   My application use flink sql, I want to add new sql to the application,

For example first version is

DataStream<AggregatedOrderItems> paymentCompleteStream = getKafkaStream(env, 
bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
        .flatMap(new 
PaymentComplete2AggregatedOrderItemFlatMap()).assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
        .returns(TypeInformation.of(AggregatedOrderItems.class));

tableEnv.registerDataStream("AggregatedOrderItems", paymentCompleteStream, 
concatFieldsName(AggregatedOrderItems.class, true, "eventTs"));

tableEnv.registerFunction("group_concat", new GroupConcatFunction());

Table resultTable = tableEnv.sqlQuery(sql1);
tableEnv.toAppendStream(resultTable, Row.class, qConfig)
        .flatMap(new E5FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
               return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

Then second version, I add new sql

Table resultTable2 = tableEnv.sqlQuery(sql2);
tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
        .flatMap(new A2FlatmapFunction(resultSampleRate)).setParallelism(30)
        .filter(new FilterFunction<DetectionResult>() {
            @Override
            public boolean filter(DetectionResult value) throws Exception {
                return (value.getViolationCount() >= 5);
            }
        }).addSink(new DetectionResultMySqlSink());

After restart job with savepoints, whether the original flink sql can be 
restore success? Whether the flink will assign a new UID to original sql 
operator? (I will not change the original sql)

Regards

James

Reply via email to