[jira] [Created] (FLINK-29110) Support to mount a dynamically-created pvc for JM and TM in standalone mode with StatefulSet.

2022-08-25 Thread Peng Yuan (Jira)
Peng Yuan created FLINK-29110:
-

 Summary: Support to mount a dynamically-created pvc for JM and TM 
in standalone mode with StatefulSet.
 Key: FLINK-29110
 URL: https://issues.apache.org/jira/browse/FLINK-29110
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / Kubernetes
Reporter: Peng Yuan


Use StatefulSet instead of Deployment to deploy JM and TM to support mount a 
dynamically-created PersistentVolumeClaim.

add volumeClaimTemplates to JobManagerSpec and TaskManagerSpec:

JobManagerSpec:

 
{code:java}
public class JobManagerSpec {
/** Resource specification for the JobManager pods. */
private Resource resource;

/** Number of JobManager replicas. Must be 1 for non-HA deployments. */
private int replicas = 1;

/** Volume Claim Templates for JobManager stateful set. Just for standalone 
mode. */
private List volumeClaimTemplates = new 
ArrayList<>();

/** JobManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
private Pod podTemplate;
}
 {code}
TaskManagerSpec:

 
{code:java}
public class TaskManagerSpec {
/** Resource specification for the TaskManager pods. */
private Resource resource;

/** Number of TaskManager replicas. If defined, takes precedence over 
parallelism */
@SpecReplicas private Integer replicas;

/** Volume Claim Templates for TaskManager stateful set. Just for 
standalone mode. */
private List volumeClaimTemplates = new 
ArrayList<>();

/** TaskManager pod template. It will be merged with 
FlinkDeploymentSpec.podTemplate. */
private Pod podTemplate;
} {code}
 

volumeClaimTemplates just available in standalone mode.

 

CR Example:

 
{code:java}
kind: FlinkDeployment
metadata:
  namespace: default
  name: basic-example
spec:
  image: flink:1.14.3
  flinkVersion: v1_14
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
replicas: 1
resource:
  memory: "2048m"
  cpu: 1
volumeClaimTemplates:
  - metadata:
  name: log
spec:
  accessModes: [ "ReadWriteOnce" ]
  storageClassName: "alicloud-local-lvm"
  resources:
requests:
  storage: 10Gi
podTemplate:
  apiVersion: v1
  kind: Pod
  metadata:
name: job-manager-pod-template
  spec:
containers:
  - name: flink-main-container
volumeMounts:
  - name: log-volume
mountPath: /opt/flink/log
  taskManager:
resource:
  replicas: 1 // (only needed for standalone clusters)*
  memory: "2048m"
  cpu: 1
volumeClaimTemplates:
  - metadata:
  name: log
spec:
  accessModes: [ "ReadWriteOnce" ]
  storageClassName: "alicloud-local-lvm"
  resources:
requests:
  storage: 10Gi
podTemplate:
  apiVersion: v1
  kind: Pod
  metadata:
name: task-manager-pod-template
  spec:
containers:
  - name: flink-main-container
volumeMounts:
  - name: log-volume
mountPath: /opt/flink/log
  mode: standalone {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29109) Checkpoint path conflict with stateless upgrade mode

2022-08-25 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-29109:


 Summary: Checkpoint path conflict with stateless upgrade mode
 Key: FLINK-29109
 URL: https://issues.apache.org/jira/browse/FLINK-29109
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Thomas Weise
Assignee: Thomas Weise


A stateful job with stateless upgrade mode (yes, there are such use cases) 
fails with checkpoint path conflict due to constant jobId and FLINK-19358 
(applies to Flink < 1.16x). Since with stateless upgrade mode the checkpoint id 
resets on restart the job is going to write to previously used locations and 
fail. The workaround is to rotate the jobId on every redeploy when the upgrade 
mode is stateless. While this can be worked around externally it is best done 
in the operator itself because reconciliation resolves when a restart is 
actually required while rotating jobId externally may trigger unnecessary 
restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29108) Kubernetes operator: Support queryable state

2022-08-25 Thread Ron Crocker (Jira)
Ron Crocker created FLINK-29108:
---

 Summary: Kubernetes operator: Support queryable state
 Key: FLINK-29108
 URL: https://issues.apache.org/jira/browse/FLINK-29108
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Ron Crocker


Enable the kubernetes operator to deploy jobs where queryable state is desired.

When queryable state is desired, the operator should configure the deployed job 
with
 # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it.
 # Configure the Queryable State proxy and Queryable State server (via the 
{{queryable-state.proxy}} and {{queryable-state.server}} configuration sections 
respectively). If these sections aren't provided, then the default 
configuration is used.

The operator will need to create a Kubernetes service fronting the Task 
Managers {{QueryableStateClientProxy}} port (as configured by the above).

Tearing down the job also tears down the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29107) Bump up spotless version to improve efficiently

2022-08-25 Thread Yu Chen (Jira)
Yu Chen created FLINK-29107:
---

 Summary: Bump up spotless version to improve efficiently
 Key: FLINK-29107
 URL: https://issues.apache.org/jira/browse/FLINK-29107
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.15.2
Reporter: Yu Chen
 Attachments: image-2022-08-25-22-10-54-453.png

Hi all, I noticed a 
[discussion|https://github.com/diffplug/spotless/issues/927] in the spotless 
GitHub repository that we can improve the efficiency of spotless checks 
significantly by upgrading the version of spotless and enabling the 
`upToDateChecking`.

I have made a simple test locally and the improvement of the spotless check 
after the upgrade is shown in the figure.
!image-2022-08-25-22-10-54-453.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

2022-08-25 Thread Timo Walther

Hi Ran,

what would be the data type of this dynamic metadata column? The planner 
and many parts of the stack will require a data type.


Personally, I feel connector developers can already have the same 
functionality by declaring a metadata column as `MAP`. 
This is what we expose already as `debezium.source.properties`. Whatever 
Debezium adds will be available through this property and can be 
accessed via `SELECT col['my-new-property'] FROM x` including being NULL 
be default if not present.


Regards,
Timo


On 25.08.22 14:04, Ran Tao wrote:

```
create table test_source(
  __test_metadata__ varchar METADATA,
  f0 varchar,
  f1 varchar,
  f2 bigint,
  ts as CURRENT_TIMESTAMP
) with(
  'connector'='test',
   ...
)
```

If we not pre define `__test_metadata__` as meta keys by implementing
listReadableMetadata, run the above sql, it will cause exception like this:

org.apache.flink.table.api.ValidationException: Invalid metadata key
'__test_metadata__' in column '__test_metadata__' of table
'default_catalog.default_database.test_source'. The DynamicTableSource
class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
supports the following metadata keys for reading:
xxx, yyy

at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)

Because the current flink metadata column must exist in results returned by
`listReadableMetadata`.  But when a certain connector adds some metadatas,
we can not use it directly unless we modify this connector code and support
it. In some situations, It can be intolerable. Can we support 'DYNAMIC
MetadataColumn'?  Its basic mechanism is not to check a column with
existing metadatas and users can define it dynamically. If a certain
connector without this metadata, the column value will return null
otherwise return it's concrete value. It has great benefits in some
scenarios.

Looking forward to your opinions.






[DISCUSS] Support 'DYNAMIC MetadataColumn' in flink table

2022-08-25 Thread Ran Tao
```
create table test_source(
 __test_metadata__ varchar METADATA,
 f0 varchar,
 f1 varchar,
 f2 bigint,
 ts as CURRENT_TIMESTAMP
) with(
 'connector'='test',
  ...
)
```

If we not pre define `__test_metadata__` as meta keys by implementing
listReadableMetadata, run the above sql, it will cause exception like this:

org.apache.flink.table.api.ValidationException: Invalid metadata key
'__test_metadata__' in column '__test_metadata__' of table
'default_catalog.default_database.test_source'. The DynamicTableSource
class 'com.alipay.flink.connectors.test.source.TestDynamicTableSource'
supports the following metadata keys for reading:
xxx, yyy

at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409)

Because the current flink metadata column must exist in results returned by
`listReadableMetadata`.  But when a certain connector adds some metadatas,
we can not use it directly unless we modify this connector code and support
it. In some situations, It can be intolerable. Can we support 'DYNAMIC
MetadataColumn'?  Its basic mechanism is not to check a column with
existing metadatas and users can define it dynamically. If a certain
connector without this metadata, the column value will return null
otherwise return it's concrete value. It has great benefits in some
scenarios.

Looking forward to your opinions.


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92


[jira] [Created] (FLINK-29106) value of metric 'idleTimeMsPerSecond' more than 1000

2022-08-25 Thread chenyuzhi (Jira)
chenyuzhi created FLINK-29106:
-

 Summary: value of metric 'idleTimeMsPerSecond'  more than 1000
 Key: FLINK-29106
 URL: https://issues.apache.org/jira/browse/FLINK-29106
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.11.3
 Environment: flink 1.11.3
Reporter: chenyuzhi
 Attachments: image-2022-08-25-19-18-52-755.png

 As the picture shown below, the value of metric 'idleTimeMsPerSecond' is more 
than 1000. It's obviously unreasonable

!https://sawiki2.nie.netease.com/media/image/chenyuzhi/20220825181655.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Apache Flink Table Store 0.2.0, release candidate #3

2022-08-25 Thread Jark Wu
+1 (binding)

- build and compile the source code locally. OK
- checked/verified signatures and hashes, OK
- no missing artifacts in the release staging area. OK
- reviewed the release PR. OK
- ran the quick start with both Flink 1.14 and 1.15 OK

Best,
Jark

On Thu, 25 Aug 2022 at 11:38, Yu Li  wrote:

> +1 (binding)
>
> - Checked release notes: *OK*
> - Checked sums and signatures: *OK*
> - Checked the jars in the staging repo: *OK*
> - Checked source distribution doesn't include binaries: *OK*
> - Maven clean install from source: *OK*
> - Checked version consistency in pom files: *OK*
> - Went through the quick start: *OK*
>   * Verified with both flink 1.14.5 and 1.15.1
> - Checked the website updates: *OK*
>
> Thanks all for the efforts!
>
> Best Regards,
> Yu
>
>
> On Wed, 24 Aug 2022 at 19:24, Nicholas Jiang 
> wrote:
>
> > Hi all!
> >
> > +1 for the release (non-binding). I've verified the jar with SQL client
> > and listed the check items as follows:
> >
> > * Compiled the sources and built the source distribution - PASSED
> > * Ran through Quick Start Guide - PASSED
> > * Checked Spark 2.3.4&3.3.0 reader and catalog with table store jar -
> > PASSED
> > * Checked all NOTICE files - PASSED
> > * Checked the website updates - PASSED
> >
> > Regards,
> > Nicholas Jiang
> >
> > On 2022/08/24 07:46:00 Jingsong Li wrote:
> > > Hi everyone,
> > >
> > > Please review and vote on the release candidate #3 for the version
> 0.2.0
> > of
> > > Apache Flink Table Store, as follows:
> > >
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > > **Release Overview**
> > >
> > > As an overview, the release consists of the following:
> > > a) Table Store canonical source distribution to be deployed to the
> > > release repository at dist.apache.org
> > > b) Table Store binary convenience releases to be deployed to the
> > > release repository at dist.apache.org
> > > c) Maven artifacts to be deployed to the Maven Central Repository
> > >
> > > **Staging Areas to Review**
> > >
> > > The staging areas containing the above mentioned artifacts are as
> > follows,
> > > for your review:
> > > * All artifacts for a) and b) can be found in the corresponding dev
> > > repository at dist.apache.org [2]
> > > * All artifacts for c) can be found at the Apache Nexus Repository [3]
> > >
> > > All artifacts are signed with the key
> > > 2C2B6A653B07086B65E4369F7C76245E0A318150 [4]
> > >
> > > Other links for your review:
> > > * JIRA release notes [5]
> > > * source code tag "release-0.2.0-rc3" [6]
> > > * PR to update the website Downloads page to include Table Store links
> > [7]
> > >
> > > **Vote Duration**
> > >
> > > The voting time will run for at least 72 hours.
> > > It is adopted by majority approval, with at least 3 PMC affirmative
> > votes.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Table+Store+Release
> > > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-table-store-0.2.0-rc3/
> > > [3]
> > https://repository.apache.org/content/repositories/orgapacheflink-1526/
> > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [5]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351570
> > > [6] https://github.com/apache/flink-table-store/tree/release-0.2.0-rc3
> > > [7] https://github.com/apache/flink-web/pull/562
> > >
> >
>


[jira] [Created] (FLINK-29105) KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents failed with AssertionFailedError

2022-08-25 Thread Xingbo Huang (Jira)
Xingbo Huang created FLINK-29105:


 Summary: 
KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents
 failed with AssertionFailedError
 Key: FLINK-29105
 URL: https://issues.apache.org/jira/browse/FLINK-29105
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.16.0
Reporter: Xingbo Huang
 Fix For: 1.16.0


{code:java}
2022-08-25T04:19:22.1429302Z Aug 25 04:19:22 [ERROR] Tests run: 25, Failures: 
1, Errors: 0, Skipped: 0, Time elapsed: 0.827 s <<< FAILURE! - in 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest
2022-08-25T04:19:22.1447098Z Aug 25 04:19:22 [ERROR] 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents
  Time elapsed: 0.031 s  <<< FAILURE!
2022-08-25T04:19:22.1447862Z Aug 25 04:19:22 
org.opentest4j.AssertionFailedError: 
2022-08-25T04:19:22.1448236Z Aug 25 04:19:22 
2022-08-25T04:19:22.1448561Z Aug 25 04:19:22 expected: 2
2022-08-25T04:19:22.1448893Z Aug 25 04:19:22  but was: 0
2022-08-25T04:19:22.1449330Z Aug 25 04:19:22at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2022-08-25T04:19:22.1450330Z Aug 25 04:19:22at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2022-08-25T04:19:22.1451114Z Aug 25 04:19:22at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2022-08-25T04:19:22.1452006Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.retryWithFirstFailedK8sOperation(KubernetesStateHandleStoreTest.java:1218)
2022-08-25T04:19:22.1452956Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.access$600(KubernetesStateHandleStoreTest.java:59)
2022-08-25T04:19:22.1453863Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.lambda$null$0(KubernetesStateHandleStoreTest.java:245)
2022-08-25T04:19:22.1454744Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient.checkAndUpdateConfigMap(TestingFlinkKubeClient.java:182)
2022-08-25T04:19:22.1455619Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.updateConfigMap(KubernetesStateHandleStore.java:634)
2022-08-25T04:19:22.1456553Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:219)
2022-08-25T04:19:22.1457435Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.lambda$new$1(KubernetesStateHandleStoreTest.java:258)
2022-08-25T04:19:22.1458482Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesHighAvailabilityTestBase$Context.runTest(KubernetesHighAvailabilityTestBase.java:107)
2022-08-25T04:19:22.1459383Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest$5.(KubernetesStateHandleStoreTest.java:237)
2022-08-25T04:19:22.1460391Z Aug 25 04:19:22at 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStoreTest.testAddAndLockShouldNotThrowAlreadyExistExceptionWithSameContents(KubernetesStateHandleStoreTest.java:235)
2022-08-25T04:19:22.1461357Z Aug 25 04:19:22at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-08-25T04:19:22.1461965Z Aug 25 04:19:22at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-08-25T04:19:22.1462653Z Aug 25 04:19:22at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-08-25T04:19:22.1463280Z Aug 25 04:19:22at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-08-25T04:19:22.1463903Z Aug 25 04:19:22at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
2022-08-25T04:19:22.1464604Z Aug 25 04:19:22at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-08-25T04:19:22.1465397Z Aug 25 04:19:22at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-08-25T04:19:22.1466211Z Aug 25 04:19:22at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-08-25T04:19:22.1466943Z Aug 25 04:19:22at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-08-25T04:19:22.1467701Z Aug 25 04:19:22at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
2022-08-25T04:19:22.1468531Z Aug 25 04:19:22at 
org.junit.jupiter.engine.executio