[flink-table-store] branch master updated: [hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to support multi-thread testing

2022-02-28 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 41c0eef  [hotfix] Set authority as thread name for 
FailingAtomicRenameFileSystem to support multi-thread testing
41c0eef is described below

commit 41c0eefa0fec69aae051c004dc606a7be04f123a
Author: tsreaper 
AuthorDate: Tue Mar 1 15:33:30 2022 +0800

[hotfix] Set authority as thread name for FailingAtomicRenameFileSystem to 
support multi-thread testing

This closes #26
---
 .../table/store/connector/FileStoreITCase.java |  5 +--
 .../store/file/manifest/ManifestFileMetaTest.java  |  6 ++--
 .../store/file/manifest/ManifestFileTest.java  |  5 ++-
 .../store/file/manifest/ManifestListTest.java  |  5 ++-
 .../store/file/mergetree/sst/SstFileTest.java  |  5 ++-
 .../store/file/operation/FileStoreCommitTest.java  | 32 +++--
 .../store/file/operation/FileStoreExpireTest.java  |  5 +--
 .../store/file/operation/FileStoreScanTest.java|  5 +--
 .../store/file/operation/OperationTestUtils.java   | 10 --
 .../store/file/operation/TestCommitThread.java |  2 +-
 .../file/utils/FailingAtomicRenameFileSystem.java  | 42 +++---
 11 files changed, 65 insertions(+), 57 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 8e7aa42..f278086 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.store.connector.source.FileStoreSource;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -214,8 +215,8 @@ public class FileStoreITCase extends AbstractTestBase {
 if (isBatch) {
 options.set(FILE_PATH, folder.toURI().toString());
 } else {
-options.set(FILE_PATH, "fail://" + folder.getPath());
-// FailingAtomicRenameFileSystem.setFailPossibility(20);
+FailingAtomicRenameFileSystem.get().reset(3, 100);
+options.set(FILE_PATH, 
FailingAtomicRenameFileSystem.getFailingPath(folder.getPath()));
 }
 options.set(FILE_FORMAT, "avro");
 return options;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index b594802..8bf995f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -110,15 +110,13 @@ public class ManifestFileMetaTest {
 
 @RepeatedTest(10)
 public void testCleanUpForException() throws IOException {
-FailingAtomicRenameFileSystem.resetFailCounter(1);
-FailingAtomicRenameFileSystem.setFailPossibility(10);
-
+FailingAtomicRenameFileSystem.get().reset(1, 10);
 List input = new ArrayList<>();
 List entries = new ArrayList<>();
 createData(input, entries, null);
 ManifestFile failingManifestFile =
 createManifestFile(
-FailingAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString());
+
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
 
 try {
 ManifestFileMeta.merge(input, entries, failingManifestFile, 500, 
30);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 99a8b40..aa65636 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -60,12 +60,11 @@ public class ManifestFileTest {
 
 @RepeatedTest(10)
 public void testCleanUpForException() throws IOException {
-FailingAtomicRenameFileSystem.resetFailCounter(1);
-FailingAtomicRenameFileSy

[flink] branch master updated: [hotfix][javadoc] Fix typo of getTriggeredSavepointStatus

2022-02-28 Thread tangyun
This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 7368370  [hotfix][javadoc] Fix typo of getTriggeredSavepointStatus
7368370 is described below

commit 7368370dc2887693fbeba13edd7c8ae671b1d49d
Author: Yun Tang 
AuthorDate: Tue Mar 1 15:05:26 2022 +0800

[hotfix][javadoc] Fix typo of getTriggeredSavepointStatus
---
 .../main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index bd26efd..44e189d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -184,7 +184,7 @@ public interface RestfulGateway extends RpcGateway {
 }
 
 /**
- * Get the status of of savepoint triggered under the specified operation 
key.
+ * Get the status of a savepoint triggered under the specified operation 
key.
  *
  * @param operationKey key of the operation
  * @return Future which completes immediately with the status, or fails if 
no operation is


[GitHub] [flink-kubernetes-operator] bgeng777 commented on pull request #29: [FLINK-26405] Add validation check of num of JM replica

2022-02-28 Thread GitBox


bgeng777 commented on pull request #29:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/29#issuecomment-1055091760


   cc @wangyang0918 @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] bgeng777 opened a new pull request #29: [FLINK-26405] Add validation check of num of JM replica

2022-02-28 Thread GitBox


bgeng777 opened a new pull request #29:
URL: https://github.com/apache/flink-kubernetes-operator/pull/29


   - Add JM replicas check in the validator to make sure when HA is enabled, 
the num of JM should be 1.
   - Add the relevant test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch master updated (1cdb659 -> 5c4d263)

2022-02-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1cdb659  [FLINK-25789][docs-zh] Translate the formats/hadoop page into 
Chinese.
 add 5c4d263  [FLINK-26289][runtime] AdaptiveScheduler: Allow exception 
history to be queried by the REST API.

No new revisions were added by this update.

Summary of changes:
 .../rest/handler/job/JobExceptionsHandler.java |  4 +-
 .../messages/JobExceptionsInfoWithHistory.java |  9 +--
 .../rest/handler/job/JobExceptionsHandlerTest.java | 32 +-
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 68 +-
 4 files changed, 102 insertions(+), 11 deletions(-)


[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816483886



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
   or better yet JobManager as you initially named :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481799



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
   I could rename this to cluster deployment status, then it wouldn't be 
confusing in case of session




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481099



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
+/**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+ */
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-.withName(clusterId)
+.withName(KubernetesUtils.getDeploymentName(clusterId))
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {
+// We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+kubernetesClient
+.configMaps()
+.inNamespace(namespace)
+.withLabels(
+KubernetesUtils.getConfigMapLabels(
+clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+.delete();
+}
+}
+
+/** We need this due to the buggy flink kube cluster client behaviour for 
now. */

Review comment:
   what I meant originally by this, is that the deployment client wont let 
you submit a new flink job as long as there is still a service around even if 
marked for deletion. Maybe this is not even a bug, but in any case now the 
comment is irrelevant with the canged behaviour, will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


tweise commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816455956



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
+/**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+ */
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-.withName(clusterId)
+.withName(KubernetesUtils.getDeploymentName(clusterId))
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {
+// We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+kubernetesClient
+.configMaps()
+.inNamespace(namespace)
+.withLabels(
+KubernetesUtils.getConfigMapLabels(
+clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+.delete();
+}
+}
+
+/** We need this due to the buggy flink kube cluster client behaviour for 
now. */
+public static void waitForClusterShutdown(
+KubernetesClient kubernetesClient, String namespace, String 
clusterId) {
+
+boolean jobManagerRunning = true;
+boolean serviceRunning = true;
+
+for (int i = 0; i < 60; i++) {
+if (jobManagerRunning) {
+PodList jmPodList =
+kubernetesClient
+.pods()
+.inNamespace(namespace)
+.withLabel(
+Constants.LABEL_TYPE_KEY, 
Constants.LABEL_TYPE_NATIVE_TYPE)
+.withLabel(
+Constants.LABEL_COMPONENT_KEY,
+Constants.LABEL_COMPONENT_JOB_MANAGER)
+.withLabel(Constants.LABEL_APP_KEY, clusterId)
+.list();
+
+if (jmPodList.getItems().isEmpty()) {
+jobManagerRunning = false;
+}
+}
+
+if (serviceRunning) {
+Service service =
+kubernetesClient
+.services()
+.inNamespace(namespace)
+.withName(
+
ExternalServiceDecorator.getExternalServiceName(clusterId))
+.fromServer()
+.get();
+if (service == null) {
+serviceRunning = false;
+}
+}
+
+if (!jobManagerRunning && !serviceRunning) {
+break;
+}
+LOG.info("Waiting for cluster shutdown... ({})", i);
+try {
+Thread.sleep(1000);
+} catch (InterruptedException e) {
+throw new RuntimeException(e);
+}
+}
+}

Review comment:
   It would be good to log that shutdown was completed, otherwise the last 
thing we see is:
   ```2022-02-28 21:12:45,381 o.a.f.k.o.u.FlinkUtils [INFO ] 
[default.basic-example] Waiting for cluster shutdown... (5)
   2022-02-28 21:12:46,395 o.a.f.k.o.u.FlinkUtils [INFO ] 
[default.basic-example] Waiting for cluster shutdown... (6)
   ```




-- 
This is an

[flink] branch master updated (6fc2933 -> 1cdb659)

2022-02-28 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6fc2933  [FLINK-26374][table-planner] Support nullability in nested 
JSON_OBJECT values
 add 1cdb659  [FLINK-25789][docs-zh] Translate the formats/hadoop page into 
Chinese.

No new revisions were added by this update.

Summary of changes:
 .../docs/connectors/datastream/formats/hadoop.md   | 50 --
 1 file changed, 19 insertions(+), 31 deletions(-)


[flink-table-store] branch master updated: [FLINK-26276] Introduce ITCases with bug fixes

2022-02-28 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new a472c23  [FLINK-26276] Introduce ITCases with bug fixes
a472c23 is described below

commit a472c23e9e6aa0900360f3bc12b7e3e325bec318
Author: Jingsong Lee 
AuthorDate: Tue Mar 1 11:48:09 2022 +0800

[FLINK-26276] Introduce ITCases with bug fixes

This closes #23
---
 flink-table-store-connector/pom.xml|  20 ++
 .../table/store/connector/sink/StoreSink.java  |   2 +-
 .../store/connector/sink/StoreSinkWriter.java  |  22 +-
 .../sink/global/GlobalCommitterOperator.java   |  32 ++-
 .../sink/global/GlobalCommittingSink.java  |  22 +-
 .../global/GlobalCommittingSinkTranslator.java |  24 +-
 .../table/store/connector/FileStoreITCase.java | 291 +
 .../table/store/connector/SerializableRowData.java | 154 +++
 .../table/store/connector/sink/StoreSinkTest.java  |   2 +-
 .../sink/global/GlobalCommitterOperatorTest.java   |   4 +-
 .../src/test/resources/log4j2-test.properties  |  28 ++
 .../flink/table/store/file/FileStoreImpl.java  | 145 ++
 .../flink/table/store/file/FileStoreOptions.java   | 106 ++--
 .../store/file/manifest/ManifestFileMeta.java  |   8 +-
 .../store/file/operation/FileStoreCommitImpl.java  |  38 +--
 .../store/file/operation/FileStoreScanImpl.java|  72 +++--
 .../store/file/utils/FileStorePathFactory.java |  10 +
 .../flink/table/store/file/utils/TypeUtils.java|   7 +-
 .../store/file/operation/OperationTestUtils.java   |  14 +-
 .../src/test/resources/log4j2-test.properties  |  28 ++
 .../src/test/resources/log4j2-test.xml |  29 --
 21 files changed, 912 insertions(+), 146 deletions(-)

diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 70ccea3..f05cd72 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -121,6 +121,26 @@ under the License.
 ${flink.version}
 test
 
+
+
+junit
+junit
+${junit4.version}
+test
+
+
+
+org.junit.vintage
+junit-vintage-engine
+${junit5.version}
+
+
+junit
+junit
+
+
+test
+
 
 
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index c61ce0c..1b980c6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -104,7 +104,7 @@ public class StoreSink
 }
 
 @Override
-public StoreGlobalCommitter createCommitter() throws IOException 
{
+public StoreGlobalCommitter createGlobalCommitter() {
 FileStoreCommit commit = fileStore.newCommit();
 CatalogLock lock;
 if (lockFactory == null) {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index dc8c993..b85999c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -128,10 +129,17 @@ public class StoreSinkWriter
 @Override
 public List prepareCommit() throws IOException {
 List committables = new ArrayList<>();
-for (BinaryRowData partition : writers.keySet()) {
-Map buckets = writers.get(partition);
-for (Integer bucket : buckets.keySet()) {
-RecordWriter writer = buckets.get(bucket);
+Iterator>> 
partIter =
+writers.entrySet().iterator();
+while (partIter.hasNext()) {
+Map.Entry> partEntry = 
partIter.next();
+BinaryRowData partition = partEntry.getKey();
+Iterator> bucketIter =
+partEntry.getValue().entrySet().iterator();
+while (bucketIter.hasNext()) {
+Map.Entry entry = bucketIter.next();
+int bucket = entry.getKe

[flink-table-store] branch master updated: [FLINK-26217] Introduce manifest.merge-min-count in commit

2022-02-28 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
 new 3c1d15a  [FLINK-26217] Introduce manifest.merge-min-count in commit
3c1d15a is described below

commit 3c1d15a794092c799c5d60a9adc432332d10223d
Author: Shen Zhu 
AuthorDate: Mon Feb 28 19:26:41 2022 -0800

[FLINK-26217] Introduce manifest.merge-min-count in commit

This closes #24
---
 .../flink/table/store/file/FileStoreOptions.java   | 18 +--
 .../store/file/manifest/ManifestFileMeta.java  | 20 +++--
 .../store/file/operation/FileStoreCommitImpl.java  |  3 +-
 .../store/file/manifest/ManifestFileMetaTest.java  | 35 --
 4 files changed, 55 insertions(+), 21 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 028b678..ea6d0d6 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -38,15 +38,29 @@ public class FileStoreOptions {
 .defaultValue(MemorySize.ofMebiBytes(8))
 .withDescription("Suggested file size of a manifest 
file.");
 
+public static final ConfigOption MANIFEST_MERGE_MIN_COUNT =
+ConfigOptions.key("manifest.merge-min-count")
+.intType()
+.defaultValue(30)
+.withDescription(
+"To avoid frequent manifest merges, this parameter 
specifies the minimum number "
++ "of ManifestFileMeta to merge.");
+
 public final int bucket;
 public final MemorySize manifestSuggestedSize;
+public final int manifestMergeMinCount;
 
-public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) {
+public FileStoreOptions(
+int bucket, MemorySize manifestSuggestedSize, int 
manifestMergeMinCount) {
 this.bucket = bucket;
 this.manifestSuggestedSize = manifestSuggestedSize;
+this.manifestMergeMinCount = manifestMergeMinCount;
 }
 
 public FileStoreOptions(ReadableConfig config) {
-this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE));
+this(
+config.get(BUCKET),
+config.get(MANIFEST_TARGET_FILE_SIZE),
+config.get(MANIFEST_MERGE_MIN_COUNT));
 }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index d6deadd..b3ad481 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -135,19 +135,22 @@ public class ManifestFileMeta {
 List metas,
 List entries,
 ManifestFile manifestFile,
-long suggestedMetaSize) {
+long suggestedMetaSize,
+int suggestedMinMetaCount) {
 List result = new ArrayList<>();
 // these are the newly created manifest files, clean them up if 
exception occurs
 List newMetas = new ArrayList<>();
 List candidate = new ArrayList<>();
 long totalSize = 0;
+int metaCount = 0;
 
 try {
 // merge existing manifests first
 for (ManifestFileMeta manifest : metas) {
 totalSize += manifest.fileSize;
+metaCount += 1;
 candidate.add(manifest);
-if (totalSize >= suggestedMetaSize) {
+if (totalSize >= suggestedMetaSize || metaCount >= 
suggestedMinMetaCount) {
 // reach suggested file size, perform merging and produce 
new file
 if (candidate.size() == 1) {
 result.add(candidate.get(0));
@@ -162,16 +165,15 @@ public class ManifestFileMeta {
 
 candidate.clear();
 totalSize = 0;
+metaCount = 0;
 }
 }
 
-// merge the last bit of metas with entries
-mergeIntoOneFile(candidate, entries, manifestFile)
-.ifPresent(
-merged -> {
-newMetas.add(merged);
-result.add(merged);
-});
+// both size and count conditions not satisfied, create new file 
from entries
+ManifestFileM

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


wangyang0918 commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816390801



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
+/**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+ */
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-.withName(clusterId)
+.withName(KubernetesUtils.getDeploymentName(clusterId))
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {
+// We need to wait for cluster shutdown otherwise confimaps might 
be recreated

Review comment:
   ```suggestion
   // We need to wait for cluster shutdown otherwise configmaps 
might be recreated
   ```
   
   Typo

##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
+/**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param namespace Namespace where the Flink cluster is deployed
+ * @param clusterId ClusterId of the Flink cluster
+ * @param kubernetesClient Kubernetes client
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+ */
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
-.withName(clusterId)
+.withName(KubernetesUtils.getDeploymentName(clusterId))
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {
+// We need to wait for cluster shutdown otherwise confimaps might 
be recreated
+waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+kubernetesClient
+.configMaps()
+.inNamespace(namespace)
+.withLabels(
+KubernetesUtils.getConfigMapLabels(
+clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+.delete();
+}
+}
+
+/** We need this due to the buggy flink kube cluster client behaviour for 
now. */

Review comment:
   I am confusing about this comment. @gyfora Do you know what is the buggy 
of flink kube cluster client?

##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +116,109 @@ private s

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


wangyang0918 commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816388033



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
 .withName(clusterId)
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {

Review comment:
   The most appropriate way to the HA data clean up is 
`HighAvailabilityServices#closeAndCleanupAllData()`. It should work both for 
ZooKeeper and K8s HA. But I agree we could comment the limitation here and do 
the improvement later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] tweise commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


tweise commented on pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054880231


   @gyfora I'm going to run through some of the scenarios manually to verify. 
Hopefully I can add the missing tests in the next days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


tweise commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816376893



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
   nit: if this applies to session mode, then the `Job` part would be 
misleading. But this can be dealt with later as we will need to iterate on the 
states more. It's already great to have a starting point!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054608306


   I have addressed the comments @wangyang0918 @tweise , let me know if there 
is anything else.
   
   I had to improve the waitForClusterShutdown logic to be able to create a 
safe point where we can delete the configmaps without the jobmanager recreating 
them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch master updated (2e627a1 -> 6fc2933)

2022-02-28 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2e627a1  [FLINK-26331] Makes configuration for repeatable cleanups 
follow the configuration pattern for task restarts
 add 6fc2933  [FLINK-26374][table-planner] Support nullability in nested 
JSON_OBJECT values

No new revisions were added by this update.

Summary of changes:
 .../table/planner/codegen/JsonGenerateUtils.scala  | 118 -
 .../planner/functions/JsonFunctionsITCase.java |  22 +++-
 2 files changed, 86 insertions(+), 54 deletions(-)


[flink] branch master updated (e0dd372 -> 2e627a1)

2022-02-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e0dd372  [FLINK-25026] 
UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP
 add 2e627a1  [FLINK-26331] Makes configuration for repeatable cleanups 
follow the configuration pattern for task restarts

No new revisions were added by this update.

Summary of changes:
 docs/content.zh/docs/deployment/config.md  |  14 ++
 docs/content/docs/deployment/config.md |  14 ++
 .../generated/all_jobmanager_section.html  |  12 --
 .../generated/cleanup_configuration.html   |  18 ++
 ...ntial_delay_cleanup_strategy_configuration.html |  30 +++
 ...fixed_delay_cleanup_strategy_configuration.html |  24 +++
 .../generated/job_manager_configuration.html   |  12 --
 .../apache/flink/configuration/CleanupOptions.java | 196 +
 .../flink/configuration/JobManagerOptions.java |  16 --
 .../configuration/RestartStrategyOptions.java  |   7 +-
 .../cleanup/CleanupRetryStrategyFactory.java   | 118 ++
 .../cleanup/DispatcherResourceCleanerFactory.java  |  13 +-
 .../cleanup/CleanupRetryStrategyFactoryTest.java   | 240 +
 13 files changed, 662 insertions(+), 52 deletions(-)
 create mode 100644 docs/layouts/shortcodes/generated/cleanup_configuration.html
 create mode 100644 
docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
 create mode 100644 
docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
 create mode 100644 
flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054395615


   I discovered that we need to wait for cluster shutdown before deleting the 
configmaps otherwise they might be recreated. Adding this now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch master updated: [FLINK-25026] UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP

2022-02-28 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new e0dd372  [FLINK-25026] 
UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP
e0dd372 is described below

commit e0dd372337fb43e89988cd53129542bff0e86b14
Author: Dawid Wysakowicz 
AuthorDate: Tue Feb 22 16:27:47 2022 +0100

[FLINK-25026] 
UnalignedCheckpointRescaleITCase.shouldRescaleUnalignedCheckpoint fails on AZP

Tests that extend from UnalignedCheckpointTestBase create a lot of
MiniClusters. E.g. the rescale it case creates 72 tests * 2 clusters
(pre & post rescale). Direct buffers allocated by netty are freed during
the GC.

At the same time Flink uses PooledBufferAllocator, where we return used
buffers earlier and we do not need to wait for GC to kick in. The idea
to make the test more stable is to reuse a single NettyBufferPool for
all clusters that are started in those tests. That way we can reuse
buffers that were previously allocated and we do not need to wait until
they are freed.

Lastly as a note. This should not be an issue in production setups, as
we do not start multiple shuffle environments in a single JVM process
(TM).
---
 .../io/network/NettyShuffleServiceFactory.java |  42 ++---
 .../io/network/netty/NettyConnectionManager.java   |  21 -
 .../SharedPoolNettyShuffleServiceFactory.java  | 104 +
 .../checkpointing/UnalignedCheckpointTestBase.java |  47 +-
 4 files changed, 175 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index e50b8e6..05e3932 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -108,13 +108,41 @@ public class NettyShuffleServiceFactory
 ResultPartitionManager resultPartitionManager,
 MetricGroup metricGroup,
 Executor ioExecutor) {
+NettyConfig nettyConfig = config.nettyConfig();
+ConnectionManager connectionManager =
+nettyConfig != null
+? new NettyConnectionManager(
+resultPartitionManager,
+taskEventPublisher,
+nettyConfig,
+config.getMaxNumberOfConnections(),
+config.isConnectionReuseEnabled())
+: new LocalConnectionManager();
+return createNettyShuffleEnvironment(
+config,
+taskExecutorResourceId,
+taskEventPublisher,
+resultPartitionManager,
+connectionManager,
+metricGroup,
+ioExecutor);
+}
+
+@VisibleForTesting
+public static NettyShuffleEnvironment createNettyShuffleEnvironment(
+NettyShuffleEnvironmentConfiguration config,
+ResourceID taskExecutorResourceId,
+TaskEventPublisher taskEventPublisher,
+ResultPartitionManager resultPartitionManager,
+ConnectionManager connectionManager,
+MetricGroup metricGroup,
+Executor ioExecutor) {
 checkNotNull(config);
 checkNotNull(taskExecutorResourceId);
 checkNotNull(taskEventPublisher);
 checkNotNull(resultPartitionManager);
 checkNotNull(metricGroup);
-
-NettyConfig nettyConfig = config.nettyConfig();
+checkNotNull(connectionManager);
 
 FileChannelManager fileChannelManager =
 new FileChannelManagerImpl(config.getTempDirs(), 
DIR_NAME_PREFIX);
@@ -127,16 +155,6 @@ public class NettyShuffleServiceFactory
 .collect(Collectors.joining("\n\t")));
 }
 
-ConnectionManager connectionManager =
-nettyConfig != null
-? new NettyConnectionManager(
-resultPartitionManager,
-taskEventPublisher,
-nettyConfig,
-config.getMaxNumberOfConnections(),
-config.isConnectionReuseEnabled())
-: new LocalConnectionManager();
-
 NetworkBufferPool networkBufferPool =
 new NetworkBufferPool(
 config.numNetworkBuffers(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 
b/flin

[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


Aitozi commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815968841



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
 .withName(clusterId)
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {

Review comment:
   Make sense to me, I'm OK to merge the current shape with some more 
comments for clarification.  
   
   But I have sense that we may have hard work to do the nice clean up work for 
other HA providers, It's a bit of out of scope of the operator responsibility 
or ability, maybe we should extend at the Flink to support `deleteAndCleanUpHA` 
?. Do you have some inputs for this cc @wangyang0918 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


Aitozi commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+/** JobManager is running and ready to receive REST API calls. */
+READY,
+
+/** JobManager is running but not ready yet to receive REST API calls. */
+DEPLOYED_NOT_READY,
+
+/** JobManager process is starting up. */
+DEPLOYING,
+
+/** JobManager deployment not found, probably not started or killed by 
user. */
+MISSING;
+
+public UpdateControl toUpdateControl(FlinkDeployment 
flinkDeployment) {
+switch (this) {
+case DEPLOYING:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+case DEPLOYED_NOT_READY:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);

Review comment:
   OK, I have created the 
[ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch master updated: [hotfix] Log architecture on startup

2022-02-28 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f95d69b  [hotfix] Log architecture on startup
f95d69b is described below

commit f95d69b83c0649383f0a2bbaca0675a604fc7218
Author: Robert Metzger 
AuthorDate: Wed Feb 23 11:45:51 2022 +0100

[hotfix] Log architecture on startup
---
 .../java/org/apache/flink/runtime/util/EnvironmentInformation.java | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index b4452fd..ea3a841 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -403,6 +403,8 @@ public class EnvironmentInformation {
 
 String inheritedLogs = System.getenv("FLINK_INHERITED_LOGS");
 
+String arch = System.getProperty("os.arch");
+
 long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
 
 if (inheritedLogs != null) {
@@ -431,6 +433,7 @@ public class EnvironmentInformation {
 log.info(" OS current user: " + System.getProperty("user.name"));
 log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
 log.info(" JVM: " + jvmVersion);
+log.info(" Arch: " + arch);
 log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
 log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : 
javaHome));
 


[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


Aitozi commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+/** JobManager is running and ready to receive REST API calls. */
+READY,
+
+/** JobManager is running but not ready yet to receive REST API calls. */
+DEPLOYED_NOT_READY,
+
+/** JobManager process is starting up. */
+DEPLOYING,
+
+/** JobManager deployment not found, probably not started or killed by 
user. */
+MISSING;
+
+public UpdateControl toUpdateControl(FlinkDeployment 
flinkDeployment) {
+switch (this) {
+case DEPLOYING:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+case DEPLOYED_NOT_READY:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);

Review comment:
   I have create the 
[ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815961573



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##
@@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
 flinkApp.getMetadata().getName(),
 flinkApp.getMetadata().getNamespace());
 
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-if (flinkApp.getStatus().getJobStatus() != null) {
-// pre-existing deployments on operator restart - 
proceed with
-// reconciliation
-return null;
-}
+return JobDeploymentStatus.READY;
 }
 LOG.info(
 "JobManager deployment {} in namespace {} port not 
ready",
 flinkApp.getMetadata().getName(),
 flinkApp.getMetadata().getNamespace());
-return UpdateControl.updateStatus(flinkApp)
-.rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);
+return JobDeploymentStatus.DEPLOYED_NOT_READY;
 }
 LOG.info(
 "JobManager deployment {} in namespace {} not yet 
ready, status {}",
 flinkApp.getMetadata().getName(),
 flinkApp.getMetadata().getNamespace(),
 status);
-// TODO: how frequently do we want here
-return UpdateControl.updateStatus(flinkApp)
-.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+return JobDeploymentStatus.DEPLOYING;
 }
+return JobDeploymentStatus.MISSING;
+}
+return JobDeploymentStatus.READY;
+}
+
+public DeleteControl shutdownAndDelete(

Review comment:
   Sure I will add a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815887310



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+/** JobManager is running and ready to receive REST API calls. */
+READY,
+
+/** JobManager is running but not ready yet to receive REST API calls. */
+DEPLOYED_NOT_READY,
+
+/** JobManager process is starting up. */
+DEPLOYING,
+
+/** JobManager deployment not found, probably not started or killed by 
user. */
+MISSING;
+
+public UpdateControl toUpdateControl(FlinkDeployment 
flinkDeployment) {
+switch (this) {
+case DEPLOYING:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+case DEPLOYED_NOT_READY:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);

Review comment:
   Sounds good, I was trying to preserve the current behaviour :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815886981



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
 .withName(clusterId)
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {

Review comment:
   You are absolutely correct, this only works for kubernetes HA (using 
configmaps). 
   
   It's a good idea to add comments to the code and highlight in the README.
   
   In the future we could support other HA providers but that is out of scope 
in this ticket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch master updated (c3df4c3 -> 2dc30c9)

2022-02-28 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c3df4c3  [FLINK-26284] Refactors lock node structure to support atomic 
deletion in ZooKeeperStateHandleStore
 add 2dc30c9  [FLINK-26053][sql-parser] Fix LOOKAHEAD warning for EXPLAIN

No new revisions were added by this update.

Summary of changes:
 .../src/main/codegen/data/Parser.tdd   |  1 -
 .../src/main/codegen/includes/parserImpls.ftl  | 20 ++--
 ...lUnParserTest.java => ReservedKeywordTest.java} | 36 +++---
 .../flink/table/api/TableEnvironmentTest.scala | 32 ---
 4 files changed, 39 insertions(+), 50 deletions(-)
 copy 
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/{FlinkSqlUnParserTest.java
 => ReservedKeywordTest.java} (53%)


[flink] branch master updated (145099b -> c3df4c3)

2022-02-28 Thread mapohl
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 145099b  [hotfix] [docs] Typo in filesystem.md (#18788)
 add 57980a1  [hotfix] Removes invalid JavaDoc
 add c3df4c3  [FLINK-26284] Refactors lock node structure to support atomic 
deletion in ZooKeeperStateHandleStore

No new revisions were added by this update.

Summary of changes:
 .../runtime/persistence/StateHandleStore.java  |  11 +-
 .../zookeeper/ZooKeeperStateHandleStore.java   | 141 ++---
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |  18 +-
 .../persistence/TestingLongStateHandleHelper.java  |  23 ++
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 328 -
 5 files changed, 464 insertions(+), 57 deletions(-)


[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


Aitozi commented on a change in pull request #28:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815701617



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static 
org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+/** JobManager is running and ready to receive REST API calls. */
+READY,
+
+/** JobManager is running but not ready yet to receive REST API calls. */
+DEPLOYED_NOT_READY,
+
+/** JobManager process is starting up. */
+DEPLOYING,
+
+/** JobManager deployment not found, probably not started or killed by 
user. */
+MISSING;
+
+public UpdateControl toUpdateControl(FlinkDeployment 
flinkDeployment) {
+switch (this) {
+case DEPLOYING:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+case DEPLOYED_NOT_READY:
+return UpdateControl.updateStatus(flinkDeployment)
+.rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);

Review comment:
   Maybe make this configurable , I will create a ticket for this.

##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode 
fromNode) {
 }
 }
 
-public static void deleteCluster(FlinkDeployment flinkApp, 
KubernetesClient kubernetesClient) {
+public static void deleteCluster(
+FlinkDeployment flinkApp,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 deleteCluster(
 flinkApp.getMetadata().getNamespace(),
 flinkApp.getMetadata().getName(),
-kubernetesClient);
+kubernetesClient,
+deleteHaConfigmaps);
 }
 
 public static void deleteCluster(
-String namespace, String clusterId, KubernetesClient 
kubernetesClient) {
+String namespace,
+String clusterId,
+KubernetesClient kubernetesClient,
+boolean deleteHaConfigmaps) {
 kubernetesClient
 .apps()
 .deployments()
 .inNamespace(namespace)
 .withName(clusterId)
 .cascading(true)
 .delete();
+
+if (deleteHaConfigmaps) {

Review comment:
   This seems only handle the situation for HA based on ConfigMap. Will 
this also work for the HA based on ZK ?

##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##
@@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
 flinkApp.getMetadata().getName(),
 flinkApp.getMetadata().getNamespace());
 
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-if (flinkApp.getStatus().getJobStatus() != null) {
-// pre-existing deployments on operator restart - 
proceed with
-// reconciliation
-return null;
-}
+return JobDeploymentStatus.READY;
 }
 LOG.info(
 "JobManager deployment {} in namespace {} port not 
ready",
 flinkApp.getMetadata().getNam

[flink] branch master updated: [hotfix] [docs] Typo in filesystem.md (#18788)

2022-02-28 Thread knaufk
This is an automated email from the ASF dual-hosted git repository.

knaufk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 145099b  [hotfix] [docs] Typo in filesystem.md (#18788)
145099b is described below

commit 145099b9446d9b749747c2413923823748369415
Author: oogetyboogety 
AuthorDate: Mon Feb 28 01:47:19 2022 -0700

[hotfix] [docs] Typo in filesystem.md (#18788)

with AvroParquetWriters from ParquetAvroWriters rename
---
 docs/content/docs/connectors/datastream/filesystem.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/datastream/filesystem.md 
b/docs/content/docs/connectors/datastream/filesystem.md
index 85224c1..4fce324 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -405,7 +405,7 @@ Schema schema = ...;
 DataStream input = ...;
 
 final FileSink sink = FileSink
-   .forBulkFormat(outputBasePath, 
ParquetAvroWriters.forGenericRecord(schema))
+   .forBulkFormat(outputBasePath, 
AvroParquetWriters.forGenericRecord(schema))
.build();
 
 input.sinkTo(sink);


[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora opened a new pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28


   Seperating the original commit from 
https://github.com/apache/flink-kubernetes-operator/pull/26 to not interfere 
with the refactor discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054011692


   > Maybe we could get this PR continued and create a dedicated ticket for 
refactoring/rethinking the controller flow since we need some time to collect 
more feedbacks from the ML.
   
   Yea, I will keep this PR open and rename it to dedicate to the refactor 
discussion. And I will move the first commit to a new PR (once I addressed all 
the comments) which is related to the original ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


wangyang0918 commented on pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1054008848


   Maybe we could get this PR continued and create a dedicated ticket for 
refactoring/rethinking the controller flow since we need some time to collect 
more feedbacks from the ML.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #27: [FLINK-26328] Control Logging Behavior in Flink Deployments

2022-02-28 Thread GitBox


morhidi commented on pull request #27:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/27#issuecomment-1053998963


   cc @gyfora @tweise @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on a change in pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#discussion_r815659148



##
File path: 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##
@@ -75,29 +83,44 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
 flinkApp.getMetadata().getName(),
 flinkApp.getMetadata().getNamespace());
 
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-if (flinkApp.getStatus().getJobStatus() != null) {
-// pre-existing deployments on operator restart - 
proceed with
-// reconciliation
-return null;
-}
+return JobDeploymentStatus.READY;

Review comment:
   I will try to make sure this case is accounted for with a test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #26: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

2022-02-28 Thread GitBox


gyfora commented on pull request #26:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/26#issuecomment-1053988753


   For the sake of transparency I have moved this to a discussion thread on the 
dev list: https://lists.apache.org/thread/ydvdxfn8go95gnmmh5k9ggwx9hwxn6tl
   
   Let's try to come to an agreement there together before continuing! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[flink] branch release-1.14 updated (77a7d40 -> e4c99b1)

2022-02-28 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 77a7d40  [FLINK-26285] Fixes an inconsistency which was introduced by 
c3a6b51 as part of changes done for FLINK-19543
 new 7a6c3c1  [FLINK-25819][runtime] Reordered requesting and recycling 
buffers in order to avoid race condition in 
testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
 new 70c8835  [FLINK-25819][runtime] Added new test for 'Insufficient 
number of network buffers' scenario into NetworkBufferPoolTest
 new e4c99b1  [hotfix][runtime] CodeStyle correction for 
NetworkBufferPoolTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/network/buffer/NetworkBufferPoolTest.java   | 91 +-
 1 file changed, 53 insertions(+), 38 deletions(-)


[flink] 02/03: [FLINK-25819][runtime] Added new test for 'Insufficient number of network buffers' scenario into NetworkBufferPoolTest

2022-02-28 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70c8835070e5dd6a44edb33915f0ef8d611c58fb
Author: Anton Kalashnikov 
AuthorDate: Mon Feb 21 16:21:34 2022 +0100

[FLINK-25819][runtime] Added new test for 'Insufficient number of network 
buffers' scenario into NetworkBufferPoolTest
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 42 ++
 1 file changed, 42 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index cc4d49f..13faa93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -288,6 +289,47 @@ public class NetworkBufferPoolTest extends TestLogger {
 }
 
 /**
+ * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the total number of
+ * allocated buffers for several requests exceeding the capacity of {@link 
NetworkBufferPool}.
+ */
+@Test
+public void testInsufficientNumberOfBuffers() throws Exception {
+final int numberOfSegmentsToRequest = 5;
+
+final NetworkBufferPool globalPool = new 
NetworkBufferPool(numberOfSegmentsToRequest, 128);
+
+try {
+// the global pool should be in available state initially
+assertTrue(globalPool.getAvailableFuture().isDone());
+
+// request 5 segments
+List segments1 =
+
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+assertFalse(globalPool.getAvailableFuture().isDone());
+assertEquals(numberOfSegmentsToRequest, segments1.size());
+
+// request only 1 segment
+IOException ioException =
+assertThrows(
+IOException.class, () -> 
globalPool.requestUnpooledMemorySegments(1));
+
+assertTrue(ioException.getMessage().contains("Insufficient number 
of network buffers"));
+
+// recycle 5 segments
+CompletableFuture availableFuture = 
globalPool.getAvailableFuture();
+globalPool.recycleUnpooledMemorySegments(segments1);
+assertTrue(availableFuture.isDone());
+
+List segments2 =
+
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
+assertFalse(globalPool.getAvailableFuture().isDone());
+assertEquals(numberOfSegmentsToRequest, segments2.size());
+} finally {
+globalPool.destroy();
+}
+}
+
+/**
  * Tests {@link NetworkBufferPool#requestUnpooledMemorySegments(int)} with 
the invalid argument
  * to cause exception.
  */


[flink] 03/03: [hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest

2022-02-28 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4c99b19e9fa876157a98d23b5b5bc498da7fac9
Author: Anton Kalashnikov 
AuthorDate: Mon Feb 21 16:37:58 2022 +0100

[hotfix][runtime] CodeStyle correction for NetworkBufferPoolTest
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 23 ++
 1 file changed, 6 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 13faa93..e5f878e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -23,10 +23,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -40,13 +37,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
@@ -55,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -63,10 +59,6 @@ import static org.junit.Assert.fail;
 /** Tests for {@link NetworkBufferPool}. */
 public class NetworkBufferPoolTest extends TestLogger {
 
-@Rule public ExpectedException expectedException = 
ExpectedException.none();
-
-@Rule public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
-
 @Test
 public void testCreatePoolAfterDestroy() {
 try {
@@ -434,10 +426,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
 segment.free();
 
-expectedException.expect(IllegalStateException.class);
-expectedException.expectMessage("destroyed");
 try {
-asyncRequest.sync();
+Exception ex = assertThrows(IllegalStateException.class, 
asyncRequest::sync);
+assertTrue(ex.getMessage().contains("destroyed"));
 } finally {
 globalPool.destroy();
 }
@@ -518,11 +509,9 @@ public class NetworkBufferPoolTest extends TestLogger {
 
 asyncRequest.start();
 
-expectedException.expect(IOException.class);
-expectedException.expectMessage("Timeout");
-
 try {
-asyncRequest.sync();
+Exception ex = assertThrows(IOException.class, asyncRequest::sync);
+assertTrue(ex.getMessage().contains("Timeout"));
 } finally {
 globalPool.destroy();
 }
@@ -683,7 +672,7 @@ public class NetworkBufferPoolTest extends TestLogger {
 // wait until all available buffers are requested
 while (segmentsRequested.size() + segments.size() + 
exclusiveSegments.size()
 < numBuffers) {
-Thread.sleep(100);
+Thread.sleep(10);
 assertNull(cause.get());
 }
 


[flink] 01/03: [FLINK-25819][runtime] Reordered requesting and recycling buffers in order to avoid race condition in testIsAvailableOrNotAfterRequestAndRecycleMultiSegments

2022-02-28 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a6c3c1b978a75898ff3f09a715e25e7e9e91690
Author: Anton Kalashnikov 
AuthorDate: Mon Feb 21 16:20:48 2022 +0100

[FLINK-25819][runtime] Reordered requesting and recycling buffers in order 
to avoid race condition in 
testIsAvailableOrNotAfterRequestAndRecycleMultiSegments
---
 .../io/network/buffer/NetworkBufferPoolTest.java   | 26 +-
 1 file changed, 5 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 73877c9..cc4d49f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -532,9 +532,8 @@ public class NetworkBufferPoolTest extends TestLogger {
  * NetworkBufferPool#requestUnpooledMemorySegments(int)} and recycled by 
{@link
  * NetworkBufferPool#recycleUnpooledMemorySegments(Collection)}.
  */
-@Test(timeout = 1L)
-public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments()
-throws InterruptedException, IOException {
+@Test
+public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() 
throws Exception {
 final int numberOfSegmentsToRequest = 5;
 final int numBuffers = 2 * numberOfSegmentsToRequest;
 
@@ -556,29 +555,14 @@ public class NetworkBufferPoolTest extends TestLogger {
 assertFalse(globalPool.getAvailableFuture().isDone());
 assertEquals(numberOfSegmentsToRequest, segments2.size());
 
-// request another 5 segments
-final CountDownLatch latch = new CountDownLatch(1);
-final List segments3 = new 
ArrayList<>(numberOfSegmentsToRequest);
-CheckedThread asyncRequest =
-new CheckedThread() {
-@Override
-public void go() throws Exception {
-// this request should be blocked until at least 5 
segments are recycled
-segments3.addAll(
-globalPool.requestUnpooledMemorySegments(
-numberOfSegmentsToRequest));
-latch.countDown();
-}
-};
-asyncRequest.start();
-
 // recycle 5 segments
 CompletableFuture availableFuture = 
globalPool.getAvailableFuture();
 globalPool.recycleUnpooledMemorySegments(segments1);
 assertTrue(availableFuture.isDone());
 
-// wait util the third request is fulfilled
-latch.await();
+// request another 5 segments
+final List segments3 =
+
globalPool.requestUnpooledMemorySegments(numberOfSegmentsToRequest);
 assertFalse(globalPool.getAvailableFuture().isDone());
 assertEquals(numberOfSegmentsToRequest, segments3.size());