[flink-table-store] branch master updated: [FLINK-28063] Introduce dedicated In and NotIn predicate function
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new d11a287c [FLINK-28063] Introduce dedicated In and NotIn predicate function d11a287c is described below commit d11a287c756713c6b16a6c93291566a780f97b4a Author: tsreaper AuthorDate: Mon Jul 4 12:26:40 2022 +0800 [FLINK-28063] Introduce dedicated In and NotIn predicate function This closes #189 --- .../flink/table/store/file/predicate/In.java | 71 ++ .../flink/table/store/file/predicate/NotIn.java| 71 ++ .../store/file/predicate/PredicateBuilder.java | 12 +- .../store/file/predicate/PredicateConverter.java | 9 + .../store/file/predicate/PredicateBuilderTest.java | 70 -- .../file/predicate/PredicateConverterTest.java | 553 +++- .../table/store/file/predicate/PredicateTest.java | 736 +++-- .../SearchArgumentToPredicateConverterTest.java| 20 +- .../table/store/spark/SparkFilterConverter.java| 12 + .../store/spark/SparkFilterConverterTest.java | 7 + 10 files changed, 817 insertions(+), 744 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java new file mode 100644 index ..aa338945 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.predicate; + +import org.apache.flink.table.store.format.FieldStats; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral; + +/** A {@link LeafFunction} to eval in. */ +public class In implements LeafFunction { + +private static final long serialVersionUID = 1L; + +public static final In INSTANCE = new In(); + +private In() {} + +@Override +public boolean test(LogicalType type, Object field, List literals) { +if (field == null) { +return false; +} +for (Object literal : literals) { +if (literal != null && compareLiteral(type, literal, field) == 0) { +return true; +} +} +return false; +} + +@Override +public boolean test( +LogicalType type, long rowCount, FieldStats fieldStats, List literals) { +if (rowCount == fieldStats.nullCount()) { +return false; +} +for (Object literal : literals) { +if (literal != null +&& compareLiteral(type, literal, fieldStats.minValue()) >= 0 +&& compareLiteral(type, literal, fieldStats.maxValue()) <= 0) { +return true; +} +} +return false; +} + +@Override +public Optional negate() { +return Optional.of(NotIn.INSTANCE); +} +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java new file mode 100644 index ..d710da0f --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the
[flink-table-store] branch master updated: [FLINK-28335] Delete topic after tests
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new c24d3d24 [FLINK-28335] Delete topic after tests c24d3d24 is described below commit c24d3d244b4ae00e599924fa8814498f98f74aab Author: Nicholas Jiang AuthorDate: Mon Jul 4 12:03:39 2022 +0800 [FLINK-28335] Delete topic after tests This closes #188 --- .../org/apache/flink/table/store/kafka/KafkaTableTestBase.java | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java index db928329..1284fcff 100644 --- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java +++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java @@ -53,6 +53,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** Base class for Kafka Table IT Cases. */ @@ -110,9 +111,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { } @After -public void after() { +public void after() throws ExecutionException, InterruptedException { // Cancel timer for debug logging cancelTimeoutLogger(); +// Delete topics for avoid reusing topics of Kafka cluster +deleteTopics(); } public Properties getStandardProps() { @@ -169,6 +172,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { } } +private void deleteTopics() throws ExecutionException, InterruptedException { +final AdminClient adminClient = AdminClient.create(getStandardProps()); + adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get(); +} + // For Debug Logging Purpose -- private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {
[flink-connector-elasticsearch] branch main updated: [FLINK-28107][python][connector/elasticsearch] Add a MapElasticsearchEmitter for PyFlink (#22)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git The following commit(s) were added to refs/heads/main by this push: new b47fedf [FLINK-28107][python][connector/elasticsearch] Add a MapElasticsearchEmitter for PyFlink (#22) b47fedf is described below commit b47fedf4f153f10614897666528c1547c8a630f1 Author: Luning (Lucas) Wang AuthorDate: Mon Jul 4 10:19:44 2022 +0800 [FLINK-28107][python][connector/elasticsearch] Add a MapElasticsearchEmitter for PyFlink (#22) --- .../sink/MapElasticsearchEmitter.java | 81 ++ 1 file changed, 81 insertions(+) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java new file mode 100644 index 000..c39c99f --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.connector.sink2.SinkWriter; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A simple ElasticsearchEmitter which is currently used in PyFlink ES connector. */ +public class MapElasticsearchEmitter implements ElasticsearchEmitter> { + +private static final long serialVersionUID = 1L; + +private final String index; +private @Nullable final String documentType; +private @Nullable final String idFieldName; +private final boolean isDynamicIndex; +private transient Function, String> indexProvider; + +public MapElasticsearchEmitter( +String index, +@Nullable String documentType, +@Nullable String idFieldName, +boolean isDynamicIndex) { +this.index = checkNotNull(index); +this.documentType = documentType; +this.idFieldName = idFieldName; +this.isDynamicIndex = isDynamicIndex; +} + +@Override +public void open() throws Exception { +if (isDynamicIndex) { +indexProvider = doc -> doc.get(index).toString(); +} else { +indexProvider = doc -> index; +} +} + +@Override +public void emit(Map doc, SinkWriter.Context context, RequestIndexer indexer) { +if (idFieldName != null) { +final UpdateRequest updateRequest = +new UpdateRequest( +indexProvider.apply(doc), +documentType, +doc.get(idFieldName).toString()) +.doc(doc) +.upsert(doc); +indexer.add(updateRequest); +} else { +final IndexRequest indexRequest = +new IndexRequest(indexProvider.apply(doc), documentType).source(doc); +indexer.add(indexRequest); +} +} +}
[flink] branch master updated: [FLINK-28298][table][python] Support left and right in Table API (#20104)
This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new da720c3b97d [FLINK-28298][table][python] Support left and right in Table API (#20104) da720c3b97d is described below commit da720c3b97d5c3e781abb1e910a8337f6a5ee0e4 Author: Luning (Lucas) Wang AuthorDate: Mon Jul 4 10:14:44 2022 +0800 [FLINK-28298][table][python] Support left and right in Table API (#20104) --- docs/data/sql_functions.yml| 2 ++ docs/data/sql_functions_zh.yml | 2 ++ flink-python/pyflink/table/expression.py | 12 .../pyflink/table/tests/test_expression.py | 2 ++ .../flink/table/api/internal/BaseExpressions.java | 12 .../functions/BuiltInFunctionDefinitions.java | 22 ++ .../expressions/converter/DirectConvertRule.java | 2 ++ .../planner/expressions/ScalarFunctionsTest.scala | 7 +++ 8 files changed, 61 insertions(+) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 2ad4347b20c..7a8795566b0 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -341,8 +341,10 @@ string: - sql: INSTR(string1, string2) description: Returns the position of the first occurrence of string2 in string1. Returns NULL if any of arguments is NULL. - sql: LEFT(string, integer) +table: STRING.LEFT(INT) description: Returns the leftmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL. - sql: RIGHT(string, integer) +table: STRING.RIGHT(INT) description: Returns the rightmost integer characters from the string. Returns EMPTY String if integer is negative. Returns NULL if any argument is NULL. - sql: LOCATE(string1, string2[, integer]) description: Returns the position of the first occurrence of string1 in string2 after position integer. Returns 0 if not found. Returns NULL if any of arguments is NULL. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index ad9940b3486..f9056bf62c3 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -434,10 +434,12 @@ string: - sql: INSTR(string1, string2) description: 返回 string2 在 string1 中第一次出现的位置。如果有任一参数为 `NULL`,则返回 `NULL`。 - sql: LEFT(string, integer) +table: STRING.LEFT(INT) description: | 返回字符串中最左边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数 为 `NULL` 则返回 `NULL`。 - sql: RIGHT(string, integer) +table: STRING.RIGHT(INT) description: | 返回字符串中最右边的长度为 integer 值的字符串。如果 integer 为负,则返回 `EMPTY` 字符串。如果有任一参数 为 `NULL` 则返回 `NULL`。 diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 1f7e64f9576..b22dd0f02b0 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1212,6 +1212,18 @@ class Expression(Generic[T]): """ return _binary_op("encode")(self, charset) +def left(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]': +""" +Returns the leftmost integer characters from the input string. +""" +return _binary_op("left")(self, length) + +def right(self, length: Union[int, 'Expression[int]']) -> 'Expression[str]': +""" +Returns the rightmost integer characters from the input string. +""" +return _binary_op("right")(self, length) + @property def ltrim(self) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index dd494cb1cdf..e17f07ba963 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -147,6 +147,8 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('chr(a)', str(expr1.chr)) self.assertEqual("decode(a, 'utf-8')", str(expr1.decode('utf-8'))) self.assertEqual("encode(a, 'utf-8')", str(expr1.encode('utf-8'))) +self.assertEqual('left(a, 2)', str(expr1.left(2))) +self.assertEqual('right(a, 2)', str(expr1.right(2))) self.assertEqual('ltrim(a)', str(expr1.ltrim)) self.assertEqual('rtrim(a)', str(expr1.rtrim)) self.assertEqual('repeat(a, 3)', str(expr1.repeat(3))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 908a2ab3957..e68fd84c3f3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++
[flink] branch master updated: [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7974e81ec51 [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063) 7974e81ec51 is described below commit 7974e81ec51a2071b9658f768f651ffc371b15b0 Author: Shengkai <33114724+fsk...@users.noreply.github.com> AuthorDate: Mon Jul 4 10:13:25 2022 +0800 [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063) --- .../gateway/service/SqlGatewayServiceITCase.java | 62 +- .../gateway/service/result/ResultFetcherTest.java | 9 +++- .../service/utils/IgnoreExceptionHandler.java | 36 + 3 files changed, 92 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 52d74fb882e..d39c797eb63 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -37,12 +37,15 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.operation.Operation; import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.function.RunnableWithException; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Condition; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -56,8 +59,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses; import static org.apache.flink.types.RowKind.DELETE; import static org.apache.flink.types.RowKind.INSERT; import static org.apache.flink.types.RowKind.UPDATE_AFTER; @@ -81,6 +86,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { SessionEnvironment.newBuilder() .setSessionEndpointVersion(MockedEndpointVersion.V1) .build(); +private final ThreadFactory threadFactory = +new ExecutorThreadFactory( +"SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE); @BeforeAll public static void setUp() { @@ -244,7 +252,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { // @Test -public void testCancelOperationAndFetchResultInParallel() throws Exception { +public void testCancelOperationAndFetchResultInParallel() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); CountDownLatch latch = new CountDownLatch(1); // Make sure cancel the Operation before finish. @@ -253,9 +261,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { sessionHandle, operationHandle, () -> service.cancelOperation(sessionHandle, operationHandle), -String.format( -"Can not fetch results from the %s in %s status.", -operationHandle, OperationStatus.CANCELED)); +new Condition<>( +msg -> +msg.contains( +String.format( +"Can not fetch results from the %s in %s status.", +operationHandle, OperationStatus.CANCELED)), +"Fetch results with expected error message.")); latch.countDown(); } @@ -273,9 +285,19 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { sessionHandle, operationHandle, () -> service.closeOperation(sessionHandle, operationHandle), -
[flink-statefun] branch master updated: [hotfix] Update config-map.yaml to helm chart (#313)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git The following commit(s) were added to refs/heads/master by this push: new ef8995d0 [hotfix] Update config-map.yaml to helm chart (#313) ef8995d0 is described below commit ef8995d0168bc75499137d8fb864884ddf6109d2 Author: Asher Liu AuthorDate: Mon Jul 4 03:18:22 2022 +0800 [hotfix] Update config-map.yaml to helm chart (#313) --- tools/k8s/templates/config-map.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/k8s/templates/config-map.yaml b/tools/k8s/templates/config-map.yaml index c96e1c54..0165934d 100644 --- a/tools/k8s/templates/config-map.yaml +++ b/tools/k8s/templates/config-map.yaml @@ -43,7 +43,7 @@ data: execution.savepoint.ignore-unclaimed-state: true kubernetes.cluster-id: {{ .Values.ha.id }} high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -high-availability.storageDir: {{ .Values.ha.dir } +high-availability.storageDir: {{ .Values.ha.dir }} log4j-console.properties: |+ rootLogger.level = INFO
[flink-kubernetes-operator] branch main updated: [FLINK-28331] Persist status after every observe loop
This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git The following commit(s) were added to refs/heads/main by this push: new a2db6c3 [FLINK-28331] Persist status after every observe loop a2db6c3 is described below commit a2db6c31e21d79715324ed45d7138cb8f6d6149e Author: Matyas Orhidi AuthorDate: Fri Jul 1 14:24:33 2022 +0200 [FLINK-28331] Persist status after every observe loop --- .../controller/FlinkDeploymentController.java | 1 + .../controller/FlinkSessionJobController.java | 1 + .../operator/observer/SavepointObserver.java | 30 .../flink/kubernetes/operator/TestUtils.java | 6 +- .../controller/DeploymentRecoveryTest.java | 7 +- .../controller/FlinkDeploymentControllerTest.java | 79 +- .../operator/controller/RollbackTest.java | 4 +- 7 files changed, 90 insertions(+), 38 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index d36f96b..d75d29c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -117,6 +117,7 @@ public class FlinkDeploymentController previousDeployment, false); } +statusRecorder.patchAndCacheStatus(flinkApp); reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, context); } catch (DeploymentFailedException dfe) { handleDeploymentFailed(flinkApp, dfe); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 30ff1ea..7f2f6bd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -96,6 +96,7 @@ public class FlinkSessionJobController } try { +statusRecorder.patchAndCacheStatus(flinkSessionJob); reconciler.reconcile(flinkSessionJob, context); } catch (Exception e) { throw new ReconciliationException(e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java index 74f3d35..aa3590a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Iterator; import java.util.List; -import java.util.Optional; /** An observer of savepoint progress. */ public class SavepointObserver> { @@ -69,10 +68,6 @@ public class SavepointObserver> { var jobStatus = resource.getStatus().getJobStatus(); var savepointInfo = jobStatus.getSavepointInfo(); var jobId = jobStatus.getJobId(); -var previousLastSpPath = -Optional.ofNullable(savepointInfo.getLastSavepoint()) -.map(Savepoint::getLocation) -.orElse(null); // If any manual or periodic savepoint is in progress, observe it if (SavepointUtils.savepointInProgress(jobStatus)) { @@ -83,8 +78,6 @@ public class SavepointObserver> { if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } - -patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** @@ -201,27 +194,4 @@ public class SavepointObserver> { throw new ReconciliationException(e); } } - -/** - * Patch the Kubernetes Flink resource status if we observed a new last savepoint. This is - * crucial to not lose this information once the reconciler shuts down the cluster. - */ -private void patchStatusOnSavepointChange( -AbstractFlinkResource resource, -SavepointInfo savepointInfo, -String previousLastSpPath) { -var currentLastSpPath = -