Hi,

At the moment (Flink 1.5.0), the operator UIDs depend on the overall
application and not only on the query.
Hence, changing the application by adding another query might change the
existing UIDs.

In general, you can only expect savepoint restarts to work if you don't
change the application and stick to the same Flink version.
We are not ready yet to guarantee savepoint compatibility across versions
yet.

Best,
Fabian


2018-06-15 9:59 GMT+02:00 James (Jian Wu) [FDS Data Platform] <
james...@coupang.com>:

> *Hi:*
>
>
>
> *   My application use flink sql, I want to add new sql to the
> application, *
>
>
>
> *For example first version is*
>
>
>
> DataStream<AggregatedOrderItems> paymentCompleteStream = *getKafkaStream*(env,
> bootStrapServers, kafkaGroup, orderPaymentCompleteTopic)
>         .flatMap(new PaymentComplete2AggregatedOrderItemFlatMap()).
> assignTimestampsAndWatermarks(wmAssigner2).setParallelism(30)
>         .returns(TypeInformation.*of*(AggregatedOrderItems.class));
>
> tableEnv.registerDataStream("AggregatedOrderItems",
> paymentCompleteStream, *concatFieldsName*(AggregatedOrderItems.class,
> true, "eventTs"));
>
> tableEnv.registerFunction("group_concat", new GroupConcatFunction());
>
> Table resultTable = tableEnv.sqlQuery(*sql1*);
> tableEnv.toAppendStream(resultTable, Row.class, qConfig)
>         .flatMap(new E5FlatmapFunction(resultSampleRate)).
> setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *Then second version, I add new sql*
>
>
>
> Table resultTable2 = tableEnv.sqlQuery(*sql2*);
> tableEnv.toAppendStream(resultTable2, Row.class, qConfig)
>         .flatMap(new A2FlatmapFunction(resultSampleRate)).
> setParallelism(30)
>         .filter(new FilterFunction<DetectionResult>() {
>             @Override
>             public boolean filter(DetectionResult value) throws Exception {
>                 return (value.getViolationCount() >= 5);
>             }
>         }).addSink(new DetectionResultMySqlSink());
>
>
>
> *After restart job with savepoints, whether the original flink sql can be
> restore success? Whether the flink will assign a new UID to original sql
> operator? (I will not change the original sql)*
>
>
>
> *Regards*
>
>
>
> *James*
>
>
>

Reply via email to