[flink-table-store] branch master updated: [FLINK-28063] Introduce dedicated In and NotIn predicate function

2022-07-03 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new d11a287c [FLINK-28063] Introduce dedicated In and NotIn predicate 
function
d11a287c is described below

commit d11a287c756713c6b16a6c93291566a780f97b4a
Author: tsreaper 
AuthorDate: Mon Jul 4 12:26:40 2022 +0800

[FLINK-28063] Introduce dedicated In and NotIn predicate function

This closes #189
---
 .../flink/table/store/file/predicate/In.java   |  71 ++
 .../flink/table/store/file/predicate/NotIn.java|  71 ++
 .../store/file/predicate/PredicateBuilder.java |  12 +-
 .../store/file/predicate/PredicateConverter.java   |   9 +
 .../store/file/predicate/PredicateBuilderTest.java |  70 --
 .../file/predicate/PredicateConverterTest.java | 553 +++-
 .../table/store/file/predicate/PredicateTest.java  | 736 +++--
 .../SearchArgumentToPredicateConverterTest.java|  20 +-
 .../table/store/spark/SparkFilterConverter.java|  12 +
 .../store/spark/SparkFilterConverterTest.java  |   7 +
 10 files changed, 817 insertions(+), 744 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
new file mode 100644
index ..aa338945
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
+
+/** A {@link LeafFunction} to eval in. */
+public class In implements LeafFunction {
+
+private static final long serialVersionUID = 1L;
+
+public static final In INSTANCE = new In();
+
+private In() {}
+
+@Override
+public boolean test(LogicalType type, Object field, List literals) 
{
+if (field == null) {
+return false;
+}
+for (Object literal : literals) {
+if (literal != null && compareLiteral(type, literal, field) == 0) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public boolean test(
+LogicalType type, long rowCount, FieldStats fieldStats, 
List literals) {
+if (rowCount == fieldStats.nullCount()) {
+return false;
+}
+for (Object literal : literals) {
+if (literal != null
+&& compareLiteral(type, literal, fieldStats.minValue()) >= 0
+&& compareLiteral(type, literal, fieldStats.maxValue()) <= 
0) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public Optional negate() {
+return Optional.of(NotIn.INSTANCE);
+}
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
new file mode 100644
index ..d710da0f
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the 

[flink-table-store] branch master updated: [FLINK-28335] Delete topic after tests

2022-07-03 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new c24d3d24 [FLINK-28335] Delete topic after tests
c24d3d24 is described below

commit c24d3d244b4ae00e599924fa8814498f98f74aab
Author: Nicholas Jiang 
AuthorDate: Mon Jul 4 12:03:39 2022 +0800

[FLINK-28335] Delete topic after tests

This closes #188
---
 .../org/apache/flink/table/store/kafka/KafkaTableTestBase.java | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index db928329..1284fcff 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -53,6 +53,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
@@ -110,9 +111,11 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
 }
 
 @After
-public void after() {
+public void after() throws ExecutionException, InterruptedException {
 // Cancel timer for debug logging
 cancelTimeoutLogger();
+// Delete topics for avoid reusing topics of Kafka cluster
+deleteTopics();
 }
 
 public Properties getStandardProps() {
@@ -169,6 +172,11 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
 }
 }
 
+private void deleteTopics() throws ExecutionException, 
InterruptedException {
+final AdminClient adminClient = AdminClient.create(getStandardProps());
+
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
+}
+
 //  For Debug Logging Purpose 
--
 
 private void scheduleTimeoutLogger(Duration period, Runnable 
loggingAction) {



[flink-connector-elasticsearch] branch main updated: [FLINK-28107][python][connector/elasticsearch] Add a MapElasticsearchEmitter for PyFlink (#22)

2022-07-03 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
 new b47fedf  [FLINK-28107][python][connector/elasticsearch] Add a 
MapElasticsearchEmitter for PyFlink (#22)
b47fedf is described below

commit b47fedf4f153f10614897666528c1547c8a630f1
Author: Luning (Lucas) Wang 
AuthorDate: Mon Jul 4 10:19:44 2022 +0800

[FLINK-28107][python][connector/elasticsearch] Add a 
MapElasticsearchEmitter for PyFlink (#22)
---
 .../sink/MapElasticsearchEmitter.java  | 81 ++
 1 file changed, 81 insertions(+)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java
new file mode 100644
index 000..c39c99f
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A simple ElasticsearchEmitter which is currently used in PyFlink ES 
connector. */
+public class MapElasticsearchEmitter implements 
ElasticsearchEmitter> {
+
+private static final long serialVersionUID = 1L;
+
+private final String index;
+private @Nullable final String documentType;
+private @Nullable final String idFieldName;
+private final boolean isDynamicIndex;
+private transient Function, String> indexProvider;
+
+public MapElasticsearchEmitter(
+String index,
+@Nullable String documentType,
+@Nullable String idFieldName,
+boolean isDynamicIndex) {
+this.index = checkNotNull(index);
+this.documentType = documentType;
+this.idFieldName = idFieldName;
+this.isDynamicIndex = isDynamicIndex;
+}
+
+@Override
+public void open() throws Exception {
+if (isDynamicIndex) {
+indexProvider = doc -> doc.get(index).toString();
+} else {
+indexProvider = doc -> index;
+}
+}
+
+@Override
+public void emit(Map doc, SinkWriter.Context context, 
RequestIndexer indexer) {
+if (idFieldName != null) {
+final UpdateRequest updateRequest =
+new UpdateRequest(
+indexProvider.apply(doc),
+documentType,
+doc.get(idFieldName).toString())
+.doc(doc)
+.upsert(doc);
+indexer.add(updateRequest);
+} else {
+final IndexRequest indexRequest =
+new IndexRequest(indexProvider.apply(doc), 
documentType).source(doc);
+indexer.add(indexRequest);
+}
+}
+}



[flink] branch master updated: [FLINK-28298][table][python] Support left and right in Table API (#20104)

2022-07-03 Thread dianfu
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new da720c3b97d [FLINK-28298][table][python] Support left and right in 
Table API (#20104)
da720c3b97d is described below

commit da720c3b97d5c3e781abb1e910a8337f6a5ee0e4
Author: Luning (Lucas) Wang 
AuthorDate: Mon Jul 4 10:14:44 2022 +0800

[FLINK-28298][table][python] Support left and right in Table API (#20104)
---
 docs/data/sql_functions.yml|  2 ++
 docs/data/sql_functions_zh.yml |  2 ++
 flink-python/pyflink/table/expression.py   | 12 
 .../pyflink/table/tests/test_expression.py |  2 ++
 .../flink/table/api/internal/BaseExpressions.java  | 12 
 .../functions/BuiltInFunctionDefinitions.java  | 22 ++
 .../expressions/converter/DirectConvertRule.java   |  2 ++
 .../planner/expressions/ScalarFunctionsTest.scala  |  7 +++
 8 files changed, 61 insertions(+)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 2ad4347b20c..7a8795566b0 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -341,8 +341,10 @@ string:
   - sql: INSTR(string1, string2)
 description:  Returns the position of the first occurrence of string2 in 
string1. Returns NULL if any of arguments is NULL.
   - sql: LEFT(string, integer)
+table: STRING.LEFT(INT)
 description: Returns the leftmost integer characters from the string. 
Returns EMPTY String if integer is negative. Returns NULL if any argument is 
NULL.
   - sql: RIGHT(string, integer)
+table: STRING.RIGHT(INT)
 description: Returns the rightmost integer characters from the string. 
Returns EMPTY String if integer is negative. Returns NULL if any argument is 
NULL.
   - sql: LOCATE(string1, string2[, integer])
 description: Returns the position of the first occurrence of string1 in 
string2 after position integer. Returns 0 if not found. Returns NULL if any of 
arguments is NULL.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index ad9940b3486..f9056bf62c3 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -434,10 +434,12 @@ string:
   - sql: INSTR(string1, string2)
 description:  返回 string2 在 string1 中第一次出现的位置。如果有任一参数为 `NULL`,则返回 `NULL`。
   - sql: LEFT(string, integer)
+table: STRING.LEFT(INT)
 description: |
   返回字符串中最左边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数
   为 `NULL` 则返回 `NULL`。
   - sql: RIGHT(string, integer)
+table: STRING.RIGHT(INT)
 description: |
   返回字符串中最右边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数
   为 `NULL` 则返回 `NULL`。
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 1f7e64f9576..b22dd0f02b0 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1212,6 +1212,18 @@ class Expression(Generic[T]):
 """
 return _binary_op("encode")(self, charset)
 
+def left(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]':
+"""
+Returns the leftmost integer characters from the input string.
+"""
+return _binary_op("left")(self, length)
+
+def right(self, length: Union[int, 'Expression[int]']) -> 
'Expression[str]':
+"""
+Returns the rightmost integer characters from the input string.
+"""
+return _binary_op("right")(self, length)
+
 @property
 def ltrim(self) -> 'Expression[str]':
 """
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index dd494cb1cdf..e17f07ba963 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -147,6 +147,8 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
 self.assertEqual('chr(a)', str(expr1.chr))
 self.assertEqual("decode(a, 'utf-8')", str(expr1.decode('utf-8')))
 self.assertEqual("encode(a, 'utf-8')", str(expr1.encode('utf-8')))
+self.assertEqual('left(a, 2)', str(expr1.left(2)))
+self.assertEqual('right(a, 2)', str(expr1.right(2)))
 self.assertEqual('ltrim(a)', str(expr1.ltrim))
 self.assertEqual('rtrim(a)', str(expr1.rtrim))
 self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 908a2ab3957..e68fd84c3f3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 

[flink] branch master updated: [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)

2022-07-03 Thread shengkai
This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7974e81ec51 [FLINK-28238][sql-gateway] Fix unstable 
testCancelOperationAndFetchResultInParallel (#20063)
7974e81ec51 is described below

commit 7974e81ec51a2071b9658f768f651ffc371b15b0
Author: Shengkai <33114724+fsk...@users.noreply.github.com>
AuthorDate: Mon Jul 4 10:13:25 2022 +0800

[FLINK-28238][sql-gateway] Fix unstable 
testCancelOperationAndFetchResultInParallel (#20063)
---
 .../gateway/service/SqlGatewayServiceITCase.java   | 62 +-
 .../gateway/service/result/ResultFetcherTest.java  |  9 +++-
 .../service/utils/IgnoreExceptionHandler.java  | 36 +
 3 files changed, 92 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 52d74fb882e..d39c797eb63 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -37,12 +37,15 @@ import 
org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.operation.Operation;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Condition;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -56,8 +59,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -81,6 +86,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase 
{
 SessionEnvironment.newBuilder()
 .setSessionEndpointVersion(MockedEndpointVersion.V1)
 .build();
+private final ThreadFactory threadFactory =
+new ExecutorThreadFactory(
+"SqlGatewayService Test Pool", 
IgnoreExceptionHandler.INSTANCE);
 
 @BeforeAll
 public static void setUp() {
@@ -244,7 +252,7 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
 // 

 
 @Test
-public void testCancelOperationAndFetchResultInParallel() throws Exception 
{
+public void testCancelOperationAndFetchResultInParallel() {
 SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
 CountDownLatch latch = new CountDownLatch(1);
 // Make sure cancel the Operation before finish.
@@ -253,9 +261,13 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
 sessionHandle,
 operationHandle,
 () -> service.cancelOperation(sessionHandle, operationHandle),
-String.format(
-"Can not fetch results from the %s in %s status.",
-operationHandle, OperationStatus.CANCELED));
+new Condition<>(
+msg ->
+msg.contains(
+String.format(
+"Can not fetch results from 
the %s in %s status.",
+operationHandle, 
OperationStatus.CANCELED)),
+"Fetch results with expected error message."));
 latch.countDown();
 }
 
@@ -273,9 +285,19 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
 sessionHandle,
 operationHandle,
 () -> service.closeOperation(sessionHandle, operationHandle),
-

[flink-statefun] branch master updated: [hotfix] Update config-map.yaml to helm chart (#313)

2022-07-03 Thread knaufk
This is an automated email from the ASF dual-hosted git repository.

knaufk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
 new ef8995d0 [hotfix] Update config-map.yaml to helm chart (#313)
ef8995d0 is described below

commit ef8995d0168bc75499137d8fb864884ddf6109d2
Author: Asher Liu 
AuthorDate: Mon Jul 4 03:18:22 2022 +0800

[hotfix] Update config-map.yaml to helm chart (#313)
---
 tools/k8s/templates/config-map.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tools/k8s/templates/config-map.yaml 
b/tools/k8s/templates/config-map.yaml
index c96e1c54..0165934d 100644
--- a/tools/k8s/templates/config-map.yaml
+++ b/tools/k8s/templates/config-map.yaml
@@ -43,7 +43,7 @@ data:
 execution.savepoint.ignore-unclaimed-state: true
 kubernetes.cluster-id: {{ .Values.ha.id }}
 high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: {{ .Values.ha.dir }
+high-availability.storageDir: {{ .Values.ha.dir }}
 
   log4j-console.properties: |+
 rootLogger.level = INFO



[flink-kubernetes-operator] branch main updated: [FLINK-28331] Persist status after every observe loop

2022-07-03 Thread gyfora
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
 new a2db6c3  [FLINK-28331] Persist status after every observe loop
a2db6c3 is described below

commit a2db6c31e21d79715324ed45d7138cb8f6d6149e
Author: Matyas Orhidi 
AuthorDate: Fri Jul 1 14:24:33 2022 +0200

[FLINK-28331] Persist status after every observe loop
---
 .../controller/FlinkDeploymentController.java  |  1 +
 .../controller/FlinkSessionJobController.java  |  1 +
 .../operator/observer/SavepointObserver.java   | 30 
 .../flink/kubernetes/operator/TestUtils.java   |  6 +-
 .../controller/DeploymentRecoveryTest.java |  7 +-
 .../controller/FlinkDeploymentControllerTest.java  | 79 +-
 .../operator/controller/RollbackTest.java  |  4 +-
 7 files changed, 90 insertions(+), 38 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index d36f96b..d75d29c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -117,6 +117,7 @@ public class FlinkDeploymentController
 previousDeployment,
 false);
 }
+statusRecorder.patchAndCacheStatus(flinkApp);
 reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, 
context);
 } catch (DeploymentFailedException dfe) {
 handleDeploymentFailed(flinkApp, dfe);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 30ff1ea..7f2f6bd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -96,6 +96,7 @@ public class FlinkSessionJobController
 }
 
 try {
+statusRecorder.patchAndCacheStatus(flinkSessionJob);
 reconciler.reconcile(flinkSessionJob, context);
 } catch (Exception e) {
 throw new ReconciliationException(e);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index 74f3d35..aa3590a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
 
 /** An observer of savepoint progress. */
 public class SavepointObserver> {
@@ -69,10 +68,6 @@ public class SavepointObserver> {
 var jobStatus = resource.getStatus().getJobStatus();
 var savepointInfo = jobStatus.getSavepointInfo();
 var jobId = jobStatus.getJobId();
-var previousLastSpPath =
-Optional.ofNullable(savepointInfo.getLastSavepoint())
-.map(Savepoint::getLocation)
-.orElse(null);
 
 // If any manual or periodic savepoint is in progress, observe it
 if (SavepointUtils.savepointInProgress(jobStatus)) {
@@ -83,8 +78,6 @@ public class SavepointObserver> {
 if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
 observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
 }
-
-patchStatusOnSavepointChange(resource, savepointInfo, 
previousLastSpPath);
 }
 
 /**
@@ -201,27 +194,4 @@ public class SavepointObserver> {
 throw new ReconciliationException(e);
 }
 }
-
-/**
- * Patch the Kubernetes Flink resource status if we observed a new last 
savepoint. This is
- * crucial to not lose this information once the reconciler shuts down the 
cluster.
- */
-private void patchStatusOnSavepointChange(
-AbstractFlinkResource resource,
-SavepointInfo savepointInfo,
-String previousLastSpPath) {
-var currentLastSpPath =
-