[flink-table-store] branch master updated: [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 41c0eef [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing 41c0eef is described below commit 41c0eefa0fec69aae051c004dc606a7be04f123a Author: tsreaper AuthorDate: Tue Mar 1 15:33:30 2022 +0800 [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing This closes #26 --- .../table/store/connector/FileStoreITCase.java | 5 +-- .../store/file/manifest/ManifestFileMetaTest.java | 6 ++-- .../store/file/manifest/ManifestFileTest.java | 5 ++- .../store/file/manifest/ManifestListTest.java | 5 ++- .../store/file/mergetree/sst/SstFileTest.java | 5 ++- .../store/file/operation/FileStoreCommitTest.java | 32 +++-- .../store/file/operation/FileStoreExpireTest.java | 5 +-- .../store/file/operation/FileStoreScanTest.java| 5 +-- .../store/file/operation/OperationTestUtils.java | 10 -- .../store/file/operation/TestCommitThread.java | 2 +- .../file/utils/FailingAtomicRenameFileSystem.java | 42 +++--- 11 files changed, 65 insertions(+), 57 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java index 8e7aa42..f278086 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.table.store.connector.source.FileStoreSource; import org.apache.flink.table.store.file.FileStore; import org.apache.flink.table.store.file.FileStoreImpl; import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator; +import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -214,8 +215,8 @@ public class FileStoreITCase extends AbstractTestBase { if (isBatch) { options.set(FILE_PATH, folder.toURI().toString()); } else { -options.set(FILE_PATH, "fail://" + folder.getPath()); -// FailingAtomicRenameFileSystem.setFailPossibility(20); +FailingAtomicRenameFileSystem.get().reset(3, 100); +options.set(FILE_PATH, FailingAtomicRenameFileSystem.getFailingPath(folder.getPath())); } options.set(FILE_FORMAT, "avro"); return options; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java index b594802..8bf995f 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java @@ -110,15 +110,13 @@ public class ManifestFileMetaTest { @RepeatedTest(10) public void testCleanUpForException() throws IOException { -FailingAtomicRenameFileSystem.resetFailCounter(1); -FailingAtomicRenameFileSystem.setFailPossibility(10); - +FailingAtomicRenameFileSystem.get().reset(1, 10); List input = new ArrayList<>(); List entries = new ArrayList<>(); createData(input, entries, null); ManifestFile failingManifestFile = createManifestFile( -FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString()); + FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())); try { ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 30); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java index 99a8b40..aa65636 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java @@ -60,12 +60,11 @@ public class ManifestFileTest { @RepeatedTest(10) public void testCleanUpForException() throws IOException { -FailingAtomicRenameFileSystem.resetFailCounter(1); -FailingAtomicRenameFileSy
[flink] branch master updated: [hotfix][javadoc] Fix typo of getTriggeredSavepointStatus
This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 7368370 [hotfix][javadoc] Fix typo of getTriggeredSavepointStatus 7368370 is described below commit 7368370dc2887693fbeba13edd7c8ae671b1d49d Author: Yun Tang AuthorDate: Tue Mar 1 15:05:26 2022 +0800 [hotfix][javadoc] Fix typo of getTriggeredSavepointStatus --- .../main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index bd26efd..44e189d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -184,7 +184,7 @@ public interface RestfulGateway extends RpcGateway { } /** - * Get the status of of savepoint triggered under the specified operation key. + * Get the status of a savepoint triggered under the specified operation key. * * @param operationKey key of the operation * @return Future which completes immediately with the status, or fails if no operation is
[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #29: [FLINK-26405] Add validation check of num of JM replica
bgeng777 commented on pull request #29: URL: https://github.com/apache/flink-kubernetes-operator/pull/29#issuecomment-1055091760 cc @wangyang0918 @gyfora -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] bgeng777 opened a new pull request #29: [FLINK-26405] Add validation check of num of JM replica
bgeng777 opened a new pull request #29: URL: https://github.com/apache/flink-kubernetes-operator/pull/29 - Add JM replicas check in the validator to make sure when HA is enabled, the num of JM should be 1. - Add the relevant test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (1cdb659 -> 5c4d263)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1cdb659 [FLINK-25789][docs-zh] Translate the formats/hadoop page into Chinese. add 5c4d263 [FLINK-26289][runtime] AdaptiveScheduler: Allow exception history to be queried by the REST API. No new revisions were added by this update. Summary of changes: .../rest/handler/job/JobExceptionsHandler.java | 4 +- .../messages/JobExceptionsInfoWithHistory.java | 9 +-- .../rest/handler/job/JobExceptionsHandlerTest.java | 32 +- .../test/scheduling/AdaptiveSchedulerITCase.java | 68 +- 4 files changed, 102 insertions(+), 11 deletions(-)
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816483886 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { Review comment: or better yet JobManager as you initially named :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481799 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { Review comment: I could rename this to cluster deployment status, then it wouldn't be confusing in case of session -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481099 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } +/** + * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally + * allows deleting the native kubernetes HA resources as well. + * + * @param namespace Namespace where the Flink cluster is deployed + * @param clusterId ClusterId of the Flink cluster + * @param kubernetesClient Kubernetes client + * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well + */ public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) -.withName(clusterId) +.withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { +// We need to wait for cluster shutdown otherwise confimaps might be recreated +waitForClusterShutdown(kubernetesClient, namespace, clusterId); +kubernetesClient +.configMaps() +.inNamespace(namespace) +.withLabels( +KubernetesUtils.getConfigMapLabels( +clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) +.delete(); +} +} + +/** We need this due to the buggy flink kube cluster client behaviour for now. */ Review comment: what I meant originally by this, is that the deployment client wont let you submit a new flink job as long as there is still a service around even if marked for deletion. Maybe this is not even a bug, but in any case now the comment is irrelevant with the canged behaviour, will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
tweise commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816455956 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } +/** + * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally + * allows deleting the native kubernetes HA resources as well. + * + * @param namespace Namespace where the Flink cluster is deployed + * @param clusterId ClusterId of the Flink cluster + * @param kubernetesClient Kubernetes client + * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well + */ public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) -.withName(clusterId) +.withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { +// We need to wait for cluster shutdown otherwise confimaps might be recreated +waitForClusterShutdown(kubernetesClient, namespace, clusterId); +kubernetesClient +.configMaps() +.inNamespace(namespace) +.withLabels( +KubernetesUtils.getConfigMapLabels( +clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) +.delete(); +} +} + +/** We need this due to the buggy flink kube cluster client behaviour for now. */ +public static void waitForClusterShutdown( +KubernetesClient kubernetesClient, String namespace, String clusterId) { + +boolean jobManagerRunning = true; +boolean serviceRunning = true; + +for (int i = 0; i < 60; i++) { +if (jobManagerRunning) { +PodList jmPodList = +kubernetesClient +.pods() +.inNamespace(namespace) +.withLabel( +Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE) +.withLabel( +Constants.LABEL_COMPONENT_KEY, +Constants.LABEL_COMPONENT_JOB_MANAGER) +.withLabel(Constants.LABEL_APP_KEY, clusterId) +.list(); + +if (jmPodList.getItems().isEmpty()) { +jobManagerRunning = false; +} +} + +if (serviceRunning) { +Service service = +kubernetesClient +.services() +.inNamespace(namespace) +.withName( + ExternalServiceDecorator.getExternalServiceName(clusterId)) +.fromServer() +.get(); +if (service == null) { +serviceRunning = false; +} +} + +if (!jobManagerRunning && !serviceRunning) { +break; +} +LOG.info("Waiting for cluster shutdown... ({})", i); +try { +Thread.sleep(1000); +} catch (InterruptedException e) { +throw new RuntimeException(e); +} +} +} Review comment: It would be good to log that shutdown was completed, otherwise the last thing we see is: ```2022-02-28 21:12:45,381 o.a.f.k.o.u.FlinkUtils [INFO ] [default.basic-example] Waiting for cluster shutdown... (5) 2022-02-28 21:12:46,395 o.a.f.k.o.u.FlinkUtils [INFO ] [default.basic-example] Waiting for cluster shutdown... (6) ``` -- This is an
[flink] branch master updated (6fc2933 -> 1cdb659)
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 6fc2933 [FLINK-26374][table-planner] Support nullability in nested JSON_OBJECT values add 1cdb659 [FLINK-25789][docs-zh] Translate the formats/hadoop page into Chinese. No new revisions were added by this update. Summary of changes: .../docs/connectors/datastream/formats/hadoop.md | 50 -- 1 file changed, 19 insertions(+), 31 deletions(-)
[flink-table-store] branch master updated: [FLINK-26276] Introduce ITCases with bug fixes
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new a472c23 [FLINK-26276] Introduce ITCases with bug fixes a472c23 is described below commit a472c23e9e6aa0900360f3bc12b7e3e325bec318 Author: Jingsong Lee AuthorDate: Tue Mar 1 11:48:09 2022 +0800 [FLINK-26276] Introduce ITCases with bug fixes This closes #23 --- flink-table-store-connector/pom.xml| 20 ++ .../table/store/connector/sink/StoreSink.java | 2 +- .../store/connector/sink/StoreSinkWriter.java | 22 +- .../sink/global/GlobalCommitterOperator.java | 32 ++- .../sink/global/GlobalCommittingSink.java | 22 +- .../global/GlobalCommittingSinkTranslator.java | 24 +- .../table/store/connector/FileStoreITCase.java | 291 + .../table/store/connector/SerializableRowData.java | 154 +++ .../table/store/connector/sink/StoreSinkTest.java | 2 +- .../sink/global/GlobalCommitterOperatorTest.java | 4 +- .../src/test/resources/log4j2-test.properties | 28 ++ .../flink/table/store/file/FileStoreImpl.java | 145 ++ .../flink/table/store/file/FileStoreOptions.java | 106 ++-- .../store/file/manifest/ManifestFileMeta.java | 8 +- .../store/file/operation/FileStoreCommitImpl.java | 38 +-- .../store/file/operation/FileStoreScanImpl.java| 72 +++-- .../store/file/utils/FileStorePathFactory.java | 10 + .../flink/table/store/file/utils/TypeUtils.java| 7 +- .../store/file/operation/OperationTestUtils.java | 14 +- .../src/test/resources/log4j2-test.properties | 28 ++ .../src/test/resources/log4j2-test.xml | 29 -- 21 files changed, 912 insertions(+), 146 deletions(-) diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml index 70ccea3..f05cd72 100644 --- a/flink-table-store-connector/pom.xml +++ b/flink-table-store-connector/pom.xml @@ -121,6 +121,26 @@ under the License. ${flink.version} test + + +junit +junit +${junit4.version} +test + + + +org.junit.vintage +junit-vintage-engine +${junit5.version} + + +junit +junit + + +test + diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java index c61ce0c..1b980c6 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java @@ -104,7 +104,7 @@ public class StoreSink } @Override -public StoreGlobalCommitter createCommitter() throws IOException { +public StoreGlobalCommitter createGlobalCommitter() { FileStoreCommit commit = fileStore.newCommit(); CatalogLock lock; if (lockFactory == null) { diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java index dc8c993..b85999c 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -128,10 +129,17 @@ public class StoreSinkWriter @Override public List prepareCommit() throws IOException { List committables = new ArrayList<>(); -for (BinaryRowData partition : writers.keySet()) { -Map buckets = writers.get(partition); -for (Integer bucket : buckets.keySet()) { -RecordWriter writer = buckets.get(bucket); +Iterator>> partIter = +writers.entrySet().iterator(); +while (partIter.hasNext()) { +Map.Entry> partEntry = partIter.next(); +BinaryRowData partition = partEntry.getKey(); +Iterator> bucketIter = +partEntry.getValue().entrySet().iterator(); +while (bucketIter.hasNext()) { +Map.Entry entry = bucketIter.next(); +int bucket = entry.getKe
[flink-table-store] branch master updated: [FLINK-26217] Introduce manifest.merge-min-count in commit
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git The following commit(s) were added to refs/heads/master by this push: new 3c1d15a [FLINK-26217] Introduce manifest.merge-min-count in commit 3c1d15a is described below commit 3c1d15a794092c799c5d60a9adc432332d10223d Author: Shen Zhu AuthorDate: Mon Feb 28 19:26:41 2022 -0800 [FLINK-26217] Introduce manifest.merge-min-count in commit This closes #24 --- .../flink/table/store/file/FileStoreOptions.java | 18 +-- .../store/file/manifest/ManifestFileMeta.java | 20 +++-- .../store/file/operation/FileStoreCommitImpl.java | 3 +- .../store/file/manifest/ManifestFileMetaTest.java | 35 -- 4 files changed, 55 insertions(+), 21 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java index 028b678..ea6d0d6 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java @@ -38,15 +38,29 @@ public class FileStoreOptions { .defaultValue(MemorySize.ofMebiBytes(8)) .withDescription("Suggested file size of a manifest file."); +public static final ConfigOption MANIFEST_MERGE_MIN_COUNT = +ConfigOptions.key("manifest.merge-min-count") +.intType() +.defaultValue(30) +.withDescription( +"To avoid frequent manifest merges, this parameter specifies the minimum number " ++ "of ManifestFileMeta to merge."); + public final int bucket; public final MemorySize manifestSuggestedSize; +public final int manifestMergeMinCount; -public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) { +public FileStoreOptions( +int bucket, MemorySize manifestSuggestedSize, int manifestMergeMinCount) { this.bucket = bucket; this.manifestSuggestedSize = manifestSuggestedSize; +this.manifestMergeMinCount = manifestMergeMinCount; } public FileStoreOptions(ReadableConfig config) { -this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE)); +this( +config.get(BUCKET), +config.get(MANIFEST_TARGET_FILE_SIZE), +config.get(MANIFEST_MERGE_MIN_COUNT)); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java index d6deadd..b3ad481 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java @@ -135,19 +135,22 @@ public class ManifestFileMeta { List metas, List entries, ManifestFile manifestFile, -long suggestedMetaSize) { +long suggestedMetaSize, +int suggestedMinMetaCount) { List result = new ArrayList<>(); // these are the newly created manifest files, clean them up if exception occurs List newMetas = new ArrayList<>(); List candidate = new ArrayList<>(); long totalSize = 0; +int metaCount = 0; try { // merge existing manifests first for (ManifestFileMeta manifest : metas) { totalSize += manifest.fileSize; +metaCount += 1; candidate.add(manifest); -if (totalSize >= suggestedMetaSize) { +if (totalSize >= suggestedMetaSize || metaCount >= suggestedMinMetaCount) { // reach suggested file size, perform merging and produce new file if (candidate.size() == 1) { result.add(candidate.get(0)); @@ -162,16 +165,15 @@ public class ManifestFileMeta { candidate.clear(); totalSize = 0; +metaCount = 0; } } -// merge the last bit of metas with entries -mergeIntoOneFile(candidate, entries, manifestFile) -.ifPresent( -merged -> { -newMetas.add(merged); -result.add(merged); -}); +// both size and count conditions not satisfied, create new file from entries +ManifestFileM
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
wangyang0918 commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816390801 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } +/** + * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally + * allows deleting the native kubernetes HA resources as well. + * + * @param namespace Namespace where the Flink cluster is deployed + * @param clusterId ClusterId of the Flink cluster + * @param kubernetesClient Kubernetes client + * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well + */ public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) -.withName(clusterId) +.withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { +// We need to wait for cluster shutdown otherwise confimaps might be recreated Review comment: ```suggestion // We need to wait for cluster shutdown otherwise configmaps might be recreated ``` Typo ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } +/** + * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally + * allows deleting the native kubernetes HA resources as well. + * + * @param namespace Namespace where the Flink cluster is deployed + * @param clusterId ClusterId of the Flink cluster + * @param kubernetesClient Kubernetes client + * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well + */ public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) -.withName(clusterId) +.withName(KubernetesUtils.getDeploymentName(clusterId)) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { +// We need to wait for cluster shutdown otherwise confimaps might be recreated +waitForClusterShutdown(kubernetesClient, namespace, clusterId); +kubernetesClient +.configMaps() +.inNamespace(namespace) +.withLabels( +KubernetesUtils.getConfigMapLabels( +clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) +.delete(); +} +} + +/** We need this due to the buggy flink kube cluster client behaviour for now. */ Review comment: I am confusing about this comment. @gyfora Do you know what is the buggy of flink kube cluster client? ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +116,109 @@ private s
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
wangyang0918 commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816388033 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) .withName(clusterId) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { Review comment: The most appropriate way to the HA data clean up is `HighAvailabilityServices#closeAndCleanupAllData()`. It should work both for ZooKeeper and K8s HA. But I agree we could comment the limitation here and do the improvement later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] tweise commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
tweise commented on pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054880231 @gyfora I'm going to run through some of the scenarios manually to verify. Hopefully I can add the missing tests in the next days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
tweise commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816376893 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { Review comment: nit: if this applies to session mode, then the `Job` part would be misleading. But this can be dealt with later as we will need to iterate on the states more. It's already great to have a starting point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054608306 I have addressed the comments @wangyang0918 @tweise , let me know if there is anything else. I had to improve the waitForClusterShutdown logic to be able to create a safe point where we can delete the configmaps without the jobmanager recreating them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (2e627a1 -> 6fc2933)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2e627a1 [FLINK-26331] Makes configuration for repeatable cleanups follow the configuration pattern for task restarts add 6fc2933 [FLINK-26374][table-planner] Support nullability in nested JSON_OBJECT values No new revisions were added by this update. Summary of changes: .../table/planner/codegen/JsonGenerateUtils.scala | 118 - .../planner/functions/JsonFunctionsITCase.java | 22 +++- 2 files changed, 86 insertions(+), 54 deletions(-)
[flink] branch master updated (e0dd372 -> 2e627a1)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e0dd372 [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP add 2e627a1 [FLINK-26331] Makes configuration for repeatable cleanups follow the configuration pattern for task restarts No new revisions were added by this update. Summary of changes: docs/content.zh/docs/deployment/config.md | 14 ++ docs/content/docs/deployment/config.md | 14 ++ .../generated/all_jobmanager_section.html | 12 -- .../generated/cleanup_configuration.html | 18 ++ ...ntial_delay_cleanup_strategy_configuration.html | 30 +++ ...fixed_delay_cleanup_strategy_configuration.html | 24 +++ .../generated/job_manager_configuration.html | 12 -- .../apache/flink/configuration/CleanupOptions.java | 196 + .../flink/configuration/JobManagerOptions.java | 16 -- .../configuration/RestartStrategyOptions.java | 7 +- .../cleanup/CleanupRetryStrategyFactory.java | 118 ++ .../cleanup/DispatcherResourceCleanerFactory.java | 13 +- .../cleanup/CleanupRetryStrategyFactoryTest.java | 240 + 13 files changed, 662 insertions(+), 52 deletions(-) create mode 100644 docs/layouts/shortcodes/generated/cleanup_configuration.html create mode 100644 docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html create mode 100644 docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054395615 I discovered that we need to wait for cluster shutdown before deleting the configmaps otherwise they might be recreated. Adding this now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated: [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e0dd372 [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP e0dd372 is described below commit e0dd372337fb43e89988cd53129542bff0e86b14 Author: Dawid Wysakowicz AuthorDate: Tue Feb 22 16:27:47 2022 +0100 [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP Tests that extend from UnalignedCheckpointTestBase create a lot of MiniClusters. E.g. the rescale it case creates 72 tests * 2 clusters (pre & post rescale). Direct buffers allocated by netty are freed during the GC. At the same time Flink uses PooledBufferAllocator, where we return used buffers earlier and we do not need to wait for GC to kick in. The idea to make the test more stable is to reuse a single NettyBufferPool for all clusters that are started in those tests. That way we can reuse buffers that were previously allocated and we do not need to wait until they are freed. Lastly as a note. This should not be an issue in production setups, as we do not start multiple shuffle environments in a single JVM process (TM). --- .../io/network/NettyShuffleServiceFactory.java | 42 ++--- .../io/network/netty/NettyConnectionManager.java | 21 - .../SharedPoolNettyShuffleServiceFactory.java | 104 + .../checkpointing/UnalignedCheckpointTestBase.java | 47 +- 4 files changed, 175 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index e50b8e6..05e3932 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -108,13 +108,41 @@ public class NettyShuffleServiceFactory ResultPartitionManager resultPartitionManager, MetricGroup metricGroup, Executor ioExecutor) { +NettyConfig nettyConfig = config.nettyConfig(); +ConnectionManager connectionManager = +nettyConfig != null +? new NettyConnectionManager( +resultPartitionManager, +taskEventPublisher, +nettyConfig, +config.getMaxNumberOfConnections(), +config.isConnectionReuseEnabled()) +: new LocalConnectionManager(); +return createNettyShuffleEnvironment( +config, +taskExecutorResourceId, +taskEventPublisher, +resultPartitionManager, +connectionManager, +metricGroup, +ioExecutor); +} + +@VisibleForTesting +public static NettyShuffleEnvironment createNettyShuffleEnvironment( +NettyShuffleEnvironmentConfiguration config, +ResourceID taskExecutorResourceId, +TaskEventPublisher taskEventPublisher, +ResultPartitionManager resultPartitionManager, +ConnectionManager connectionManager, +MetricGroup metricGroup, +Executor ioExecutor) { checkNotNull(config); checkNotNull(taskExecutorResourceId); checkNotNull(taskEventPublisher); checkNotNull(resultPartitionManager); checkNotNull(metricGroup); - -NettyConfig nettyConfig = config.nettyConfig(); +checkNotNull(connectionManager); FileChannelManager fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX); @@ -127,16 +155,6 @@ public class NettyShuffleServiceFactory .collect(Collectors.joining("\n\t"))); } -ConnectionManager connectionManager = -nettyConfig != null -? new NettyConnectionManager( -resultPartitionManager, -taskEventPublisher, -nettyConfig, -config.getMaxNumberOfConnections(), -config.isConnectionReuseEnabled()) -: new LocalConnectionManager(); - NetworkBufferPool networkBufferPool = new NetworkBufferPool( config.numNetworkBuffers(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flin
[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
Aitozi commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815968841 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) .withName(clusterId) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { Review comment: Make sense to me, I'm OK to merge the current shape with some more comments for clarification. But I have sense that we may have hard work to do the nice clean up work for other HA providers, It's a bit of out of scope of the operator responsibility or ability, maybe we should extend at the Flink to support `deleteAndCleanUpHA` ?. Do you have some inputs for this cc @wangyang0918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
Aitozi commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { + +/** JobManager is running and ready to receive REST API calls. */ +READY, + +/** JobManager is running but not ready yet to receive REST API calls. */ +DEPLOYED_NOT_READY, + +/** JobManager process is starting up. */ +DEPLOYING, + +/** JobManager deployment not found, probably not started or killed by user. */ +MISSING; + +public UpdateControl toUpdateControl(FlinkDeployment flinkDeployment) { +switch (this) { +case DEPLOYING: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); +case DEPLOYED_NOT_READY: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); Review comment: OK, I have created the [ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated: [hotfix] Log architecture on startup
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f95d69b [hotfix] Log architecture on startup f95d69b is described below commit f95d69b83c0649383f0a2bbaca0675a604fc7218 Author: Robert Metzger AuthorDate: Wed Feb 23 11:45:51 2022 +0100 [hotfix] Log architecture on startup --- .../java/org/apache/flink/runtime/util/EnvironmentInformation.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java index b4452fd..ea3a841 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java @@ -403,6 +403,8 @@ public class EnvironmentInformation { String inheritedLogs = System.getenv("FLINK_INHERITED_LOGS"); +String arch = System.getProperty("os.arch"); + long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20; if (inheritedLogs != null) { @@ -431,6 +433,7 @@ public class EnvironmentInformation { log.info(" OS current user: " + System.getProperty("user.name")); log.info(" Current Hadoop/Kerberos user: " + getHadoopUser()); log.info(" JVM: " + jvmVersion); +log.info(" Arch: " + arch); log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes"); log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
Aitozi commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { + +/** JobManager is running and ready to receive REST API calls. */ +READY, + +/** JobManager is running but not ready yet to receive REST API calls. */ +DEPLOYED_NOT_READY, + +/** JobManager process is starting up. */ +DEPLOYING, + +/** JobManager deployment not found, probably not started or killed by user. */ +MISSING; + +public UpdateControl toUpdateControl(FlinkDeployment flinkDeployment) { +switch (this) { +case DEPLOYING: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); +case DEPLOYED_NOT_READY: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); Review comment: I have create the [ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815961573 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java ## @@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) { flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace()); jobManagerDeployments.add(flinkApp.getMetadata().getUid()); -if (flinkApp.getStatus().getJobStatus() != null) { -// pre-existing deployments on operator restart - proceed with -// reconciliation -return null; -} +return JobDeploymentStatus.READY; } LOG.info( "JobManager deployment {} in namespace {} port not ready", flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace()); -return UpdateControl.updateStatus(flinkApp) -.rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); +return JobDeploymentStatus.DEPLOYED_NOT_READY; } LOG.info( "JobManager deployment {} in namespace {} not yet ready, status {}", flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace(), status); -// TODO: how frequently do we want here -return UpdateControl.updateStatus(flinkApp) -.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); + +return JobDeploymentStatus.DEPLOYING; } +return JobDeploymentStatus.MISSING; +} +return JobDeploymentStatus.READY; +} + +public DeleteControl shutdownAndDelete( Review comment: Sure I will add a comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815887310 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { + +/** JobManager is running and ready to receive REST API calls. */ +READY, + +/** JobManager is running but not ready yet to receive REST API calls. */ +DEPLOYED_NOT_READY, + +/** JobManager process is starting up. */ +DEPLOYING, + +/** JobManager deployment not found, probably not started or killed by user. */ +MISSING; + +public UpdateControl toUpdateControl(FlinkDeployment flinkDeployment) { +switch (this) { +case DEPLOYING: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); +case DEPLOYED_NOT_READY: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); Review comment: Sounds good, I was trying to preserve the current behaviour :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815886981 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) .withName(clusterId) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { Review comment: You are absolutely correct, this only works for kubernetes HA (using configmaps). It's a good idea to add comments to the code and highlight in the README. In the future we could support other HA providers but that is out of scope in this ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch master updated (c3df4c3 -> 2dc30c9)
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c3df4c3 [FLINK-26284] Refactors lock node structure to support atomic deletion in ZooKeeperStateHandleStore add 2dc30c9 [FLINK-26053][sql-parser] Fix LOOKAHEAD warning for EXPLAIN No new revisions were added by this update. Summary of changes: .../src/main/codegen/data/Parser.tdd | 1 - .../src/main/codegen/includes/parserImpls.ftl | 20 ++-- ...lUnParserTest.java => ReservedKeywordTest.java} | 36 +++--- .../flink/table/api/TableEnvironmentTest.scala | 32 --- 4 files changed, 39 insertions(+), 50 deletions(-) copy flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/{FlinkSqlUnParserTest.java => ReservedKeywordTest.java} (53%)
[flink] branch master updated (145099b -> c3df4c3)
This is an automated email from the ASF dual-hosted git repository. mapohl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 145099b [hotfix] [docs] Typo in filesystem.md (#18788) add 57980a1 [hotfix] Removes invalid JavaDoc add c3df4c3 [FLINK-26284] Refactors lock node structure to support atomic deletion in ZooKeeperStateHandleStore No new revisions were added by this update. Summary of changes: .../runtime/persistence/StateHandleStore.java | 11 +- .../zookeeper/ZooKeeperStateHandleStore.java | 141 ++--- .../ZooKeeperCompletedCheckpointStoreITCase.java | 18 +- .../persistence/TestingLongStateHandleHelper.java | 23 ++ .../zookeeper/ZooKeeperStateHandleStoreTest.java | 328 - 5 files changed, 464 insertions(+), 57 deletions(-)
[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
Aitozi commented on a change in pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815701617 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; + +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS; +import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS; + +/** Status of the Flink job deployment. */ +public enum JobDeploymentStatus { + +/** JobManager is running and ready to receive REST API calls. */ +READY, + +/** JobManager is running but not ready yet to receive REST API calls. */ +DEPLOYED_NOT_READY, + +/** JobManager process is starting up. */ +DEPLOYING, + +/** JobManager deployment not found, probably not started or killed by user. */ +MISSING; + +public UpdateControl toUpdateControl(FlinkDeployment flinkDeployment) { +switch (this) { +case DEPLOYING: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS); +case DEPLOYED_NOT_READY: +return UpdateControl.updateStatus(flinkDeployment) +.rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS); Review comment: Maybe make this configurable , I will create a ticket for this. ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java ## @@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) { } } -public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) { +public static void deleteCluster( +FlinkDeployment flinkApp, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { deleteCluster( flinkApp.getMetadata().getNamespace(), flinkApp.getMetadata().getName(), -kubernetesClient); +kubernetesClient, +deleteHaConfigmaps); } public static void deleteCluster( -String namespace, String clusterId, KubernetesClient kubernetesClient) { +String namespace, +String clusterId, +KubernetesClient kubernetesClient, +boolean deleteHaConfigmaps) { kubernetesClient .apps() .deployments() .inNamespace(namespace) .withName(clusterId) .cascading(true) .delete(); + +if (deleteHaConfigmaps) { Review comment: This seems only handle the situation for HA based on ConfigMap. Will this also work for the HA based on ZK ? ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java ## @@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) { flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace()); jobManagerDeployments.add(flinkApp.getMetadata().getUid()); -if (flinkApp.getStatus().getJobStatus() != null) { -// pre-existing deployments on operator restart - proceed with -// reconciliation -return null; -} +return JobDeploymentStatus.READY; } LOG.info( "JobManager deployment {} in namespace {} port not ready", flinkApp.getMetadata().getNam
[flink] branch master updated: [hotfix] [docs] Typo in filesystem.md (#18788)
This is an automated email from the ASF dual-hosted git repository. knaufk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 145099b [hotfix] [docs] Typo in filesystem.md (#18788) 145099b is described below commit 145099b9446d9b749747c2413923823748369415 Author: oogetyboogety AuthorDate: Mon Feb 28 01:47:19 2022 -0700 [hotfix] [docs] Typo in filesystem.md (#18788) with AvroParquetWriters from ParquetAvroWriters rename --- docs/content/docs/connectors/datastream/filesystem.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md index 85224c1..4fce324 100644 --- a/docs/content/docs/connectors/datastream/filesystem.md +++ b/docs/content/docs/connectors/datastream/filesystem.md @@ -405,7 +405,7 @@ Schema schema = ...; DataStream input = ...; final FileSink sink = FileSink - .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema)) + .forBulkFormat(outputBasePath, AvroParquetWriters.forGenericRecord(schema)) .build(); input.sinkTo(sink);
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora opened a new pull request #28: URL: https://github.com/apache/flink-kubernetes-operator/pull/28 Seperating the original commit from https://github.com/apache/flink-kubernetes-operator/pull/26 to not interfere with the refactor discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on pull request #26: URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054011692 > Maybe we could get this PR continued and create a dedicated ticket for refactoring/rethinking the controller flow since we need some time to collect more feedbacks from the ML. Yea, I will keep this PR open and rename it to dedicate to the refactor discussion. And I will move the first commit to a new PR (once I addressed all the comments) which is related to the original ticket -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
wangyang0918 commented on pull request #26: URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054008848 Maybe we could get this PR continued and create a dedicated ticket for refactoring/rethinking the controller flow since we need some time to collect more feedbacks from the ML. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #27: [FLINK-26328] Control Logging Behavior in Flink Deployments
morhidi commented on pull request #27: URL: https://github.com/apache/flink-kubernetes-operator/pull/27#issuecomment-1053998963 cc @gyfora @tweise @wangyang0918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on a change in pull request #26: URL: https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815659148 ## File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java ## @@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) { flinkApp.getMetadata().getName(), flinkApp.getMetadata().getNamespace()); jobManagerDeployments.add(flinkApp.getMetadata().getUid()); -if (flinkApp.getStatus().getJobStatus() != null) { -// pre-existing deployments on operator restart - proceed with -// reconciliation -return null; -} +return JobDeploymentStatus.READY; Review comment: I will try to make sure this case is accounted for with a test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well
gyfora commented on pull request #26: URL: https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053988753 For the sake of transparency I have moved this to a discussion thread on the dev list: https://lists.apache.org/thread/ydvdxfn8go95gnmmh5k9ggwx9hwxn6tl Let's try to come to an agreement there together before continuing! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[flink] branch release-1.14 updated (77a7d40 -> e4c99b1)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git. from 77a7d40 [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543 new 7a6c3c1 [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments new 70c8835 [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest new e4c99b1 [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../io/network/buffer/NetworkBufferPoolTest.java | 91 +- 1 file changed, 53 insertions(+), 38 deletions(-)
[flink] 02/03: [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git commit 70c8835070e5dd6a44edb33915f0ef8d611c58fb Author: Anton Kalashnikov AuthorDate: Mon Feb 21 16:21:34 2022 +0100 [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest --- .../io/network/buffer/NetworkBufferPoolTest.java | 42 ++ 1 file changed, 42 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index cc4d49f..13faa93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -56,6 +56,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -288,6 +289,47 @@ public class NetworkBufferPoolTest extends TestLogger { } /** + * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the total number of + * allocated buffers for several requests exceeding the capacity of {@link NetworkBufferPool}. + */ +@Test +public void testInsufficientNumberOfBuffers() throws Exception { +final int numberOfSegmentsToRequest = 5; + +final NetworkBufferPool globalPool = new NetworkBufferPool(numberOfSegmentsToRequest, 128); + +try { +// the global pool should be in available state initially +assertTrue(globalPool.getAvailableFuture().isDone()); + +// request 5 segments +List segments1 = + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); +assertFalse(globalPool.getAvailableFuture().isDone()); +assertEquals(numberOfSegmentsToRequest, segments1.size()); + +// request only 1 segment +IOException ioException = +assertThrows( +IOException.class, () -> globalPool.requestUnpooledMemorySegments(1)); + +assertTrue(ioException.getMessage().contains("Insufficient number of network buffers")); + +// recycle 5 segments +CompletableFuture availableFuture = globalPool.getAvailableFuture(); +globalPool.recycleUnpooledMemorySegments(segments1); +assertTrue(availableFuture.isDone()); + +List segments2 = + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); +assertFalse(globalPool.getAvailableFuture().isDone()); +assertEquals(numberOfSegmentsToRequest, segments2.size()); +} finally { +globalPool.destroy(); +} +} + +/** * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with the invalid argument * to cause exception. */
[flink] 03/03: [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git commit e4c99b19e9fa876157a98d23b5b5bc498da7fac9 Author: Anton Kalashnikov AuthorDate: Mon Feb 21 16:37:58 2022 +0100 [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest --- .../io/network/buffer/NetworkBufferPoolTest.java | 23 ++ 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index 13faa93..e5f878e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -23,10 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.util.TestLogger; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.Timeout; import java.io.IOException; import java.time.Duration; @@ -40,13 +37,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.hamcrest.core.IsNot.not; @@ -55,7 +52,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -63,10 +59,6 @@ import static org.junit.Assert.fail; /** Tests for {@link NetworkBufferPool}. */ public class NetworkBufferPoolTest extends TestLogger { -@Rule public ExpectedException expectedException = ExpectedException.none(); - -@Rule public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); - @Test public void testCreatePoolAfterDestroy() { try { @@ -434,10 +426,9 @@ public class NetworkBufferPoolTest extends TestLogger { segment.free(); -expectedException.expect(IllegalStateException.class); -expectedException.expectMessage("destroyed"); try { -asyncRequest.sync(); +Exception ex = assertThrows(IllegalStateException.class, asyncRequest::sync); +assertTrue(ex.getMessage().contains("destroyed")); } finally { globalPool.destroy(); } @@ -518,11 +509,9 @@ public class NetworkBufferPoolTest extends TestLogger { asyncRequest.start(); -expectedException.expect(IOException.class); -expectedException.expectMessage("Timeout"); - try { -asyncRequest.sync(); +Exception ex = assertThrows(IOException.class, asyncRequest::sync); +assertTrue(ex.getMessage().contains("Timeout")); } finally { globalPool.destroy(); } @@ -683,7 +672,7 @@ public class NetworkBufferPoolTest extends TestLogger { // wait until all available buffers are requested while (segmentsRequested.size() + segments.size() + exclusiveSegments.size() < numBuffers) { -Thread.sleep(100); +Thread.sleep(10); assertNull(cause.get()); }
[flink] 01/03: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git commit 7a6c3c1b978a75898ff3f09a715e25e7e9e91690 Author: Anton Kalashnikov AuthorDate: Mon Feb 21 16:20:48 2022 +0100 [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments --- .../io/network/buffer/NetworkBufferPoolTest.java | 26 +- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index 73877c9..cc4d49f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -532,9 +532,8 @@ public class NetworkBufferPoolTest extends TestLogger { * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by {@link * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}. */ -@Test(timeout = 1L) -public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() -throws InterruptedException, IOException { +@Test +public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws Exception { final int numberOfSegmentsToRequest = 5; final int numBuffers = 2 * numberOfSegmentsToRequest; @@ -556,29 +555,14 @@ public class NetworkBufferPoolTest extends TestLogger { assertFalse(globalPool.getAvailableFuture().isDone()); assertEquals(numberOfSegmentsToRequest, segments2.size()); -// request another 5 segments -final CountDownLatch latch = new CountDownLatch(1); -final List segments3 = new ArrayList<>(numberOfSegmentsToRequest); -CheckedThread asyncRequest = -new CheckedThread() { -@Override -public void go() throws Exception { -// this request should be blocked until at least 5 segments are recycled -segments3.addAll( -globalPool.requestUnpooledMemorySegments( -numberOfSegmentsToRequest)); -latch.countDown(); -} -}; -asyncRequest.start(); - // recycle 5 segments CompletableFuture availableFuture = globalPool.getAvailableFuture(); globalPool.recycleUnpooledMemorySegments(segments1); assertTrue(availableFuture.isDone()); -// wait util the third request is fulfilled -latch.await(); +// request another 5 segments +final List segments3 = + globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest); assertFalse(globalPool.getAvailableFuture().isDone()); assertEquals(numberOfSegmentsToRequest, segments3.size());