[jira] [Created] (FLINK-29110) Support to mount a dynamically-created pvc for JM and TM in standalone mode with StatefulSet.
Peng Yuan created FLINK-29110: - Summary: Support to mount a dynamically-created pvc for JM and TM in standalone mode with StatefulSet. Key: FLINK-29110 URL: https://issues.apache.org/jira/browse/FLINK-29110 Project: Flink Issue Type: New Feature Components: Deployment / Kubernetes Reporter: Peng Yuan Use StatefulSet instead of Deployment to deploy JM and TM to support mount a dynamically-created PersistentVolumeClaim. add volumeClaimTemplates to JobManagerSpec and TaskManagerSpec: JobManagerSpec: {code:java} public class JobManagerSpec { /** Resource specification for the JobManager pods. */ private Resource resource; /** Number of JobManager replicas. Must be 1 for non-HA deployments. */ private int replicas = 1; /** Volume Claim Templates for JobManager stateful set. Just for standalone mode. */ private List volumeClaimTemplates = new ArrayList<>(); /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */ private Pod podTemplate; } {code} TaskManagerSpec: {code:java} public class TaskManagerSpec { /** Resource specification for the TaskManager pods. */ private Resource resource; /** Number of TaskManager replicas. If defined, takes precedence over parallelism */ @SpecReplicas private Integer replicas; /** Volume Claim Templates for TaskManager stateful set. Just for standalone mode. */ private List volumeClaimTemplates = new ArrayList<>(); /** TaskManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */ private Pod podTemplate; } {code} volumeClaimTemplates just available in standalone mode. CR Example: {code:java} kind: FlinkDeployment metadata: namespace: default name: basic-example spec: image: flink:1.14.3 flinkVersion: v1_14 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 volumeClaimTemplates: - metadata: name: log spec: accessModes: [ "ReadWriteOnce" ] storageClassName: "alicloud-local-lvm" resources: requests: storage: 10Gi podTemplate: apiVersion: v1 kind: Pod metadata: name: job-manager-pod-template spec: containers: - name: flink-main-container volumeMounts: - name: log-volume mountPath: /opt/flink/log taskManager: resource: replicas: 1 // (only needed for standalone clusters)* memory: "2048m" cpu: 1 volumeClaimTemplates: - metadata: name: log spec: accessModes: [ "ReadWriteOnce" ] storageClassName: "alicloud-local-lvm" resources: requests: storage: 10Gi podTemplate: apiVersion: v1 kind: Pod metadata: name: task-manager-pod-template spec: containers: - name: flink-main-container volumeMounts: - name: log-volume mountPath: /opt/flink/log mode: standalone {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode
Thomas Weise created FLINK-29109: Summary: Checkpoint path conflict with stateless upgrade mode Key: FLINK-29109 URL: https://issues.apache.org/jira/browse/FLINK-29109 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Thomas Weise Assignee: Thomas Weise A stateful job with stateless upgrade mode (yes, there are such use cases) fails with checkpoint path conflict due to constant jobId and FLINK-19358 (applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint id resets on restart the job is going to write to previously used locations and fail. The workaround is to rotate the jobId on every redeploy when the upgrade mode is stateless. While this can be worked around externally it is best done in the operator itself because reconciliation resolves when a restart is actually required while rotating jobId externally may trigger unnecessary restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29108) Kubernetes operator: Support queryable state
Ron Crocker created FLINK-29108: --- Summary: Kubernetes operator: Support queryable state Key: FLINK-29108 URL: https://issues.apache.org/jira/browse/FLINK-29108 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Ron Crocker Enable the kubernetes operator to deploy jobs where queryable state is desired. When queryable state is desired, the operator should configure the deployed job with # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it. # Configure the Queryable State proxy and Queryable State server (via the {{queryable-state.proxy}} and {{queryable-state.server}} configuration sections respectively). If these sections aren't provided, then the default configuration is used. The operator will need to create a Kubernetes service fronting the Task Managers {{QueryableStateClientProxy}} port (as configured by the above). Tearing down the job also tears down the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29107) Bump up spotless version to improve efficiently
Yu Chen created FLINK-29107: --- Summary: Bump up spotless version to improve efficiently Key: FLINK-29107 URL: https://issues.apache.org/jira/browse/FLINK-29107 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.15.2 Reporter: Yu Chen Attachments: image-2022-08-25-22-10-54-453.png Hi all, I noticed a [discussion|https://github.com/diffplug/spotless/issues/927] in the spotless GitHub repository that we can improve the efficiency of spotless checks significantly by upgrading the version of spotless and enabling the `upToDateChecking`. I have made a simple test locally and the improvement of the spotless check after the upgrade is shown in the figure. !image-2022-08-25-22-10-54-453.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table
Hi Ran, what would be the data type of this dynamic metadata column? The planner and many parts of the stack will require a data type. Personally, I feel connector developers can already have the same functionality by declaring a metadata column as `MAP`. This is what we expose already as `debezium.source.properties`. Whatever Debezium adds will be available through this property and can be accessed via `SELECT col['my-new-property'] FROM x` including being NULL be default if not present. Regards, Timo On 25.08.22 14:04, Ran Tao wrote: ``` create table test_source( __test_metadata__ varchar METADATA, f0 varchar, f1 varchar, f2 bigint, ts as CURRENT_TIMESTAMP ) with( 'connector'='test', ... ) ``` If we not pre define `__test_metadata__` as meta keys by implementing listReadableMetadata, run the above sql, it will cause exception like this: org.apache.flink.table.api.ValidationException: Invalid metadata key '__test_metadata__' in column '__test_metadata__' of table 'default_catalog.default_database.test_source'. The DynamicTableSource class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' supports the following metadata keys for reading: xxx, yyy at org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) Because the current flink metadata column must exist in results returned by `listReadableMetadata`. But when a certain connector adds some metadatas, we can not use it directly unless we modify this connector code and support it. In some situations, It can be intolerable. Can we support 'DYNAMIC MetadataColumn'? Its basic mechanism is not to check a column with existing metadatas and users can define it dynamically. If a certain connector without this metadata, the column value will return null otherwise return it's concrete value. It has great benefits in some scenarios. Looking forward to your opinions.
[DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table
``` create table test_source( __test_metadata__ varchar METADATA, f0 varchar, f1 varchar, f2 bigint, ts as CURRENT_TIMESTAMP ) with( 'connector'='test', ... ) ``` If we not pre define `__test_metadata__` as meta keys by implementing listReadableMetadata, run the above sql, it will cause exception like this: org.apache.flink.table.api.ValidationException: Invalid metadata key '__test_metadata__' in column '__test_metadata__' of table 'default_catalog.default_database.test_source'. The DynamicTableSource class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' supports the following metadata keys for reading: xxx, yyy at org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) Because the current flink metadata column must exist in results returned by `listReadableMetadata`. But when a certain connector adds some metadatas, we can not use it directly unless we modify this connector code and support it. In some situations, It can be intolerable. Can we support 'DYNAMIC MetadataColumn'? Its basic mechanism is not to check a column with existing metadatas and users can define it dynamically. If a certain connector without this metadata, the column value will return null otherwise return it's concrete value. It has great benefits in some scenarios. Looking forward to your opinions. -- Best Regards, Ran Tao https://github.com/chucheng92
[jira] [Created] (FLINK-29106) value of metric 'idleTimeMsPerSecond' more than 1000
chenyuzhi created FLINK-29106: - Summary: value of metric 'idleTimeMsPerSecond' more than 1000 Key: FLINK-29106 URL: https://issues.apache.org/jira/browse/FLINK-29106 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.11.3 Environment: flink 1.11.3 Reporter: chenyuzhi Attachments: image-2022-08-25-19-18-52-755.png As the picture shown below, the value of metric 'idleTimeMsPerSecond' is more than 1000. It's obviously unreasonable !https://sawiki2.nie.netease.com/media/image/chenyuzhi/20220825181655.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] Apache Flink Table Store 0.2.0, release candidate #3
+1 (binding) - build and compile the source code locally. OK - checked/verified signatures and hashes, OK - no missing artifacts in the release staging area. OK - reviewed the release PR. OK - ran the quick start with both Flink 1.14 and 1.15 OK Best, Jark On Thu, 25 Aug 2022 at 11:38, Yu Li wrote: > +1 (binding) > > - Checked release notes: *OK* > - Checked sums and signatures: *OK* > - Checked the jars in the staging repo: *OK* > - Checked source distribution doesn't include binaries: *OK* > - Maven clean install from source: *OK* > - Checked version consistency in pom files: *OK* > - Went through the quick start: *OK* > * Verified with both flink 1.14.5 and 1.15.1 > - Checked the website updates: *OK* > > Thanks all for the efforts! > > Best Regards, > Yu > > > On Wed, 24 Aug 2022 at 19:24, Nicholas Jiang > wrote: > > > Hi all! > > > > +1 for the release (non-binding). I've verified the jar with SQL client > > and listed the check items as follows: > > > > * Compiled the sources and built the source distribution - PASSED > > * Ran through Quick Start Guide - PASSED > > * Checked Spark 2.3.4&3.3.0 reader and catalog with table store jar - > > PASSED > > * Checked all NOTICE files - PASSED > > * Checked the website updates - PASSED > > > > Regards, > > Nicholas Jiang > > > > On 2022/08/24 07:46:00 Jingsong Li wrote: > > > Hi everyone, > > > > > > Please review and vote on the release candidate #3 for the version > 0.2.0 > > of > > > Apache Flink Table Store, as follows: > > > > > > [ ] +1, Approve the release > > > [ ] -1, Do not approve the release (please provide specific comments) > > > > > > **Release Overview** > > > > > > As an overview, the release consists of the following: > > > a) Table Store canonical source distribution to be deployed to the > > > release repository at dist.apache.org > > > b) Table Store binary convenience releases to be deployed to the > > > release repository at dist.apache.org > > > c) Maven artifacts to be deployed to the Maven Central Repository > > > > > > **Staging Areas to Review** > > > > > > The staging areas containing the above mentioned artifacts are as > > follows, > > > for your review: > > > * All artifacts for a) and b) can be found in the corresponding dev > > > repository at dist.apache.org [2] > > > * All artifacts for c) can be found at the Apache Nexus Repository [3] > > > > > > All artifacts are signed with the key > > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4] > > > > > > Other links for your review: > > > * JIRA release notes [5] > > > * source code tag "release-0.2.0-rc3" [6] > > > * PR to update the website Downloads page to include Table Store links > > [7] > > > > > > **Vote Duration** > > > > > > The voting time will run for at least 72 hours. > > > It is adopted by majority approval, with at least 3 PMC affirmative > > votes. > > > > > > Best, > > > Jingsong Lee > > > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release > > > [2] > > > https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.0-rc3/ > > > [3] > > https://repository.apache.org/content/repositories/orgapacheflink-1526/ > > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > > > [5] > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351570 > > > [6] https://github.com/apache/flink-table-store/tree/release-0.2.0-rc3 > > > [7] https://github.com/apache/flink-web/pull/562 > > > > > >
[jira] [Created] (FLINK-29105) KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents failed with AssertionFailedError
Xingbo Huang created FLINK-29105: Summary: KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents failed with AssertionFailedError Key: FLINK-29105 URL: https://issues.apache.org/jira/browse/FLINK-29105 Project: Flink Issue Type: Bug Components: Deployment / Kubernetes Affects Versions: 1.16.0 Reporter: Xingbo Huang Fix For: 1.16.0 {code:java} 2022-08-25T04:19:22.1429302Z Aug 25 04:19:22 [ERROR] Tests run: 25, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.827 s <<< FAILURE! - in org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest 2022-08-25T04:19:22.1447098Z Aug 25 04:19:22 [ERROR] org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents Time elapsed: 0.031 s <<< FAILURE! 2022-08-25T04:19:22.1447862Z Aug 25 04:19:22 org.opentest4j.AssertionFailedError: 2022-08-25T04:19:22.1448236Z Aug 25 04:19:22 2022-08-25T04:19:22.1448561Z Aug 25 04:19:22 expected: 2 2022-08-25T04:19:22.1448893Z Aug 25 04:19:22 but was: 0 2022-08-25T04:19:22.1449330Z Aug 25 04:19:22at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 2022-08-25T04:19:22.1450330Z Aug 25 04:19:22at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 2022-08-25T04:19:22.1451114Z Aug 25 04:19:22at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 2022-08-25T04:19:22.1452006Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.retryWithFirstFailedK8sOperation(KubernetesStateHandleStoreTest.java:1218) 2022-08-25T04:19:22.1452956Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.access$600(KubernetesStateHandleStoreTest.java:59) 2022-08-25T04:19:22.1453863Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.lambda$null$0(KubernetesStateHandleStoreTest.java:245) 2022-08-25T04:19:22.1454744Z Aug 25 04:19:22at org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient.checkAndUpdateConfigMap(TestingFlinkKubeClient.java:182) 2022-08-25T04:19:22.1455619Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.updateConfigMap(KubernetesStateHandleStore.java:634) 2022-08-25T04:19:22.1456553Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:219) 2022-08-25T04:19:22.1457435Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.lambda$new$1(KubernetesStateHandleStoreTest.java:258) 2022-08-25T04:19:22.1458482Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityTestBase$Context.runTest(KubernetesHighAvailabilityTestBase.java:107) 2022-08-25T04:19:22.1459383Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.(KubernetesStateHandleStoreTest.java:237) 2022-08-25T04:19:22.1460391Z Aug 25 04:19:22at org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents(KubernetesStateHandleStoreTest.java:235) 2022-08-25T04:19:22.1461357Z Aug 25 04:19:22at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2022-08-25T04:19:22.1461965Z Aug 25 04:19:22at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2022-08-25T04:19:22.1462653Z Aug 25 04:19:22at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2022-08-25T04:19:22.1463280Z Aug 25 04:19:22at java.lang.reflect.Method.invoke(Method.java:498) 2022-08-25T04:19:22.1463903Z Aug 25 04:19:22at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) 2022-08-25T04:19:22.1464604Z Aug 25 04:19:22at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) 2022-08-25T04:19:22.1465397Z Aug 25 04:19:22at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) 2022-08-25T04:19:22.1466211Z Aug 25 04:19:22at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) 2022-08-25T04:19:22.1466943Z Aug 25 04:19:22at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) 2022-08-25T04:19:22.1467701Z Aug 25 04:19:22at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) 2022-08-25T04:19:22.1468531Z Aug 25 04:19:22at org.junit.jupiter.engine.executio