[GitHub] [flink] dianfu commented on pull request #19732: [FLINK-18887][python][connector/elasticsearch] Add Elasticsearch DataStream API

2022-05-18 Thread GitBox


dianfu commented on PR #19732:
URL: https://github.com/apache/flink/pull/19732#issuecomment-1131215042

   @deadwind4 Thanks a lot for the PR. It needs to be rebased to fix the 
conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27690) Add Python documentation and examples for Pulsar

2022-05-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27690:

Component/s: Documentation
 Examples

> Add Python documentation and examples for Pulsar
> 
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation, Examples
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27690) Add Python documentation and examples for Pulsar connector

2022-05-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27690:

Summary: Add Python documentation and examples for Pulsar connector  (was: 
Add Python documentation and examples for Pulsar)

> Add Python documentation and examples for Pulsar connector
> --
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation, Examples
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27690) Add Python documentation and examples for Pulsar

2022-05-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-27690:

Summary: Add Python documentation and examples for Pulsar  (was: Add Pulsar 
Source connector document and examples)

> Add Python documentation and examples for Pulsar
> 
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27690) Add Pulsar Source connector document and examples

2022-05-18 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539303#comment-17539303
 ] 

Dian Fu commented on FLINK-27690:
-

[~ana4] Thanks. Have assigned it to you~

> Add Pulsar Source connector document and examples
> -
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27690) Add Pulsar Source connector document and examples

2022-05-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-27690:
---

Assignee: LuNng Wang

> Add Pulsar Source connector document and examples
> -
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Closed] (FLINK-25795) Support Pulsar sink connector in Python DataStream API.

2022-05-18 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-25795.
---
Fix Version/s: 1.16.0
   Resolution: Fixed

Merged to master via a70e28913a0db4fd7ca7511b0b60b682bd887726

> Support Pulsar sink connector in Python DataStream API.
> ---
>
> Key: FLINK-25795
> URL: https://issues.apache.org/jira/browse/FLINK-25795
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python, Connectors / Pulsar
>Affects Versions: 1.14.3
>Reporter: LuNng Wang
>Assignee: LuNng Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> https://issues.apache.org/jira/browse/FLINK-20732 the PR of this ticket is 
> reviewd, we could develop Python Pulsar sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] dianfu closed pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-18 Thread GitBox


dianfu closed pull request #19682: [FLINK-25795][python][connector/pulsar] Add 
pulsar sink DataStream API
URL: https://github.com/apache/flink/pull/19682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27506) update playgrounds for Flink 1.14

2022-05-18 Thread Shubham Bansal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539291#comment-17539291
 ] 

Shubham Bansal commented on FLINK-27506:


[~danderson] Gentle reminder if you could please take a look at the PRs or 
point me to how to get this PR reviewed would be great. I will start working on 
1.15 version PRs as well after that.

> update playgrounds for Flink 1.14
> -
>
> Key: FLINK-27506
> URL: https://issues.apache.org/jira/browse/FLINK-27506
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation / Training / Exercises
>Affects Versions: 1.14.4
>Reporter: David Anderson
>Priority: Major
>  Labels: starter
>
> All of the flink-playgrounds need to be updated for 1.14.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


gyfora commented on PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#issuecomment-1131197977

   Thanks @wangyang0918 for the input, I removed the unnecessary redundant HA 
config checks, this made this change much simpler.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #226: [FLINK-27645] Update overview / supported features page for 1.0.0

2022-05-18 Thread GitBox


morhidi commented on PR #226:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/226#issuecomment-1131192981

   There are some missing pieces, but wanted to open it on for feedback, and 
other parts of the documentation is still moving, so links probably need to be 
corrected/added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #226: [FLINK-27645] Update overview / supported features page for 1.0.0

2022-05-18 Thread GitBox


morhidi commented on code in PR #226:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/226#discussion_r876584508


##
docs/content/docs/concepts/overview.md:
##
@@ -30,50 +30,52 @@ Flink Kubernetes Operator acts as a control plane to manage 
the complete deploym
 Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
 
 # Features
-
-| Category   | Feature| Support | Comment  
|
-|||-|--|
-| Kubernetes integration | Flink Native   | full|  
|
-|| Standalone | no  |  
|
-| Deployment Mode| Application Mode   | full|  
|
-|| Session Mode   | limited | no job 
management|
-| Lifecycle Management   | Start Job  | full| empty 
state or from savepoint|
-|| Upgrade Job| full| 
stateless or last-state(chkp/svp)|
-|| Delete Job | full|  
|
-|| Pause/Resume Job   | full|  
|
-|| Savepoint Management   | limited | manual 
savepoint triggering only |
-|| HA | full| via 
flink native k8s HA  |
-|| Validation | full| webhook 
and operator based   |
-| Configuration  | Operator configuration | full| defaults 
and helm values override|
-|| Native Flink properties| full| defaults 
and job level override  |
-|| Environment variables  | full| via pod 
templates|
-|| Native Kubernetes POD settings | full| via pod 
templates|
-| Operations | Installation   | limited | Helm 
based, no public repos used |
-|| UI Access  | limited | domain 
based routing only|
-|| Operator Log Aggregation   | full| k8s 
native and/or custom appender|
-|| Operator Metric Aggregation| limited | basic 
process metrics only   |
-|| Job Logs   | full| k8s 
native and/or custom appender|
-|| Job Metrics| full| k8s 
native and/or custom appender|
-|| K8s Events | limited | 
deployment events only   |
-|| Error Handling and Recovery| limited | 
non-configurable exponential backoff |
-| Pod Augment| Pod Template   | full|  
|
-|| Init containers| full|  
|
-|| Sidecar containers | full|  
|
-|| Layering   | full| jm/tm 
level override |
-| Job Type   | Jar job| full|  
|
-|| SQL Job| no  |  
|
-|| Python Job | no  |  
|
-| CI/CD  | Continuous Integration | full| via 
github actions   |
-|| Public Docker repository   | full| ghcr.io 
/ dockerhub  |
-|| Public Helm repository | full| apache 
release repo  |
-
+## Core
+- Fully-automated [Job Lifecycle 
Management](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/)
+  - Running, suspending and deleting applications
+  - Stateful and stateless application upgrades
+  - Triggering and managing savepoints
+  - Handling errors, rolling-back broken upgrades
+- 

[jira] [Commented] (FLINK-27677) Kubernetes reuse rest.bind-port, but do not support a range of ports

2022-05-18 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539204#comment-17539204
 ] 

Yang Wang commented on FLINK-27677:
---

If you configure rest.bind-port as a port range, the UT in 
KubernetesClusterDescriptorTest will certainly fail. Because we do not support 
this.

I admit that the {{rest.bind-port}} is not fully respected by native K8s 
deployment, as well as {{blob.server.port}}, {{taskmanager.rpc.port}}, etc. But 
it is trivial since users do not need to configure a port range when deploying 
on K8s.

For your suggestions, the #2 is unnecessary and will make the users more 
confused. And #3, we should not select a port from the range in the  
{{KubernetesUtils}}. Instead, it needs to be done in the JobManager side. And 
then the k8s rest service target port also needs to be updated accordingly. 
Then it is an overkill.

If you insist, I agree we could update the description for all the port range 
related config options. Even though, I prefer to believe it is not really 
necessary.

> Kubernetes reuse rest.bind-port, but do not support a range of ports
> 
>
> Key: FLINK-27677
> URL: https://issues.apache.org/jira/browse/FLINK-27677
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: tartarus
>Priority: Major
>
> k8s module reuse the rest options {color:#DE350B}rest.bind-port{color},
> but do not support a range of ports
> {code:java}
>/**
>  * Parse a valid port for the config option. A fixed port is expected, 
> and do not support a
>  * range of ports.
>  *
>  * @param flinkConfig flink config
>  * @param port port config option
>  * @return valid port
>  */
> public static Integer parsePort(Configuration flinkConfig, 
> ConfigOption port) {
> checkNotNull(flinkConfig.get(port), port.key() + " should not be 
> null.");
> try {
> return Integer.parseInt(flinkConfig.get(port));
> } catch (NumberFormatException ex) {
> throw new FlinkRuntimeException(
> port.key()
> + " should be specified to a fixed port. Do not 
> support a range of ports.",
> ex);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


gyfora commented on code in PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#discussion_r876581661


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##
@@ -100,46 +103,49 @@ public void reconcile(FlinkDeployment flinkApp, Context 
context) throws Exceptio
 return;
 }
 
+Configuration observeConfig = configManager.getObserveConfig(flinkApp);
 boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
 if (specChanged) {
 if (newSpecIsAlreadyDeployed(flinkApp)) {
 return;
 }
 LOG.debug("Detected spec change, starting upgrade process.");
-Optional availableUpgradeMode = 
getAvailableUpgradeMode(flinkApp);
-if (availableUpgradeMode.isEmpty()) {
-return;
-}
-UpgradeMode upgradeMode = availableUpgradeMode.get();
-
 JobState currentJobState = lastReconciledSpec.getJob().getState();
 JobState desiredJobState = desiredJobSpec.getState();
 JobState stateAfterReconcile = currentJobState;
 if (currentJobState == JobState.RUNNING) {
 if (desiredJobState == JobState.RUNNING) {
 LOG.info("Upgrading/Restarting running job, suspending 
first...");
 }
-flinkService.cancelJob(flinkApp, upgradeMode);
+Optional availableUpgradeMode =
+getAvailableUpgradeMode(flinkApp, deployConfig);
+if (availableUpgradeMode.isEmpty()) {
+return;
+}
+// We must record the upgrade mode used to the status later
+desiredJobSpec.setUpgradeMode(availableUpgradeMode.get());

Review Comment:
   @wangyang0918 this is the part I was referring to. It is very important to 
know what upgradeMode we used during suspend to be able to verify/not-verify HA 
metadata. 
   
   If you are switching from without HA (from savepoint/stateless) the 
available upgrade mode is savepoint. In this case we record savepoint in the 
lastReconciledSpec upgrademode field so on restore we know not to enforce HA 
metadata validation because it cannot possibly be there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


gyfora commented on code in PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#discussion_r876580526


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -245,9 +245,19 @@ public static void deleteJobGraphInKubernetesHA(
 }
 
 public static boolean isHaMetadataAvailable(
-Configuration conf, KubernetesClient kubernetesClient) {
-String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+Configuration observeConfig,
+Configuration deployConfig,
+KubernetesClient kubernetesClient) {
+
+if (!FlinkUtils.isKubernetesHAActivated(observeConfig)

Review Comment:
   you are right, I will try to clean this up and keep it in the 
getAvailableUpgrade mode where it covers a special cornercase for the savpeoint 
upgrade mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27504) State compaction not happening with sliding window and incremental RocksDB backend

2022-05-18 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539200#comment-17539200
 ] 

Yun Tang commented on FLINK-27504:
--

[~asardaes] I have already repiled to you in the mail.
I will close this ticket if no more questions.

> State compaction not happening with sliding window and incremental RocksDB 
> backend
> --
>
> Key: FLINK-27504
> URL: https://issues.apache.org/jira/browse/FLINK-27504
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.4
> Environment: Local Flink cluster on Arch Linux.
>Reporter: Alexis Sarda-Espinosa
>Priority: Major
> Attachments: duration_trend_52ca77c.png, duration_trend_67c76bb.png, 
> duration_trend_c5dd5d2.png, image-2022-05-06-10-34-35-007.png, 
> size_growth_52ca77c.png, size_growth_67c76bb.png, size_growth_c5dd5d2.png
>
>
> Hello,
> I'm trying to estimate an upper bound for RocksDB's state size in my 
> application. For that purpose, I have created a small job with faster timings 
> whose code you can find on GitHub: 
> [https://github.com/asardaes/flink-rocksdb-ttl-test]. You can see some of the 
> results there, but I summarize here as well:
>  * Approximately 20 events per second, 10 unique keys for partitioning are 
> pre-specified.
>  * Sliding window of 11 seconds with a 1-second slide.
>  * Allowed lateness of 11 seconds.
>  * State TTL configured to 1 minute and compaction after 1000 entries.
>  * Both window-specific and window-global state used.
>  * Checkpoints every 2 seconds.
>  * Parallelism of 4 in stateful tasks.
> The goal is to let the job run and analyze state compaction behavior with 
> RocksDB. I should note that global state is cleaned manually inside the 
> functions, TTL for those is in case some keys are no longer seen in the 
> actual production environment.
> I have been running the job on a local cluster (outside IDE), the 
> configuration YAML is also available in the repository. After running for 
> approximately 1.6 days, state size is currently 2.3 GiB (see attachments). I 
> understand state can retain expired data for a while, but since TTL is 1 
> minute, this seems excessive to me.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-table-store] openinx commented on a diff in pull request #126: [FLINK-27678] Support append-only table for file store.

2022-05-18 Thread GitBox


openinx commented on code in PR #126:
URL: https://github.com/apache/flink-table-store/pull/126#discussion_r876574729


##
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReader.java:
##
@@ -203,10 +209,21 @@ private class FileStoreRecordIterator implements 
BulkFormat.RecordIterator

[GitHub] [flink] JesseAtSZ commented on pull request #19767: [hotfix] [docs]: Fix docunment mongodb.md

2022-05-18 Thread GitBox


JesseAtSZ commented on PR #19767:
URL: https://github.com/apache/flink/pull/19767#issuecomment-1131175192

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wsry commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-05-18 Thread GitBox


wsry commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r876572087


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##
@@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer(
 PartitionDescriptor partitionDescriptor,
 ProducerDescriptor producerDescriptor);
 
+/**
+ * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+ * given id.
+ *
+ * @param intermediateDataSetID The id of hte intermediate data set.
+ * @return all the shuffle descriptors for the partitions in the 
intermediate data set. Null if
+ * not exist.
+ */
+default Collection getClusterPartitionShuffleDescriptors(

Review Comment:
   Thanks for pinging me. About the ShuffleMaster interface change, I have left 
some comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wsry commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-05-18 Thread GitBox


wsry commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r876570257


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##
@@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer(
 PartitionDescriptor partitionDescriptor,
 ProducerDescriptor producerDescriptor);
 
+/**
+ * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+ * given id.
+ *
+ * @param intermediateDataSetID The id of the intermediate data set.
+ * @return all the shuffle descriptors for the partitions in the 
intermediate data set. Null if
+ * not exist.
+ */
+default Collection getClusterPartitionShuffleDescriptors(
+IntermediateDataSetID intermediateDataSetID) {
+return Collections.emptyList();
+}
+
+/**
+ * Promote the given partition to cluster partition.
+ *
+ * @param shuffleDescriptor The shuffle descriptors of the partition to 
promote.
+ */
+default void promotePartition(ShuffleDescriptor shuffleDescriptor) {}
+
+/**
+ * Remove the given partition from cluster partition.
+ *
+ * @param shuffleDescriptor The shuffle descriptors of the cluster 
partition to be removed.
+ */
+default void removeClusterPartition(ShuffleDescriptor shuffleDescriptor) {}

Review Comment:
   What's the difference of this method and the below one 
(releasePartitionExternally)? Can we call releasePartitionExternally instead of 
this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wsry commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-05-18 Thread GitBox


wsry commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r876570703


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##
@@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer(
 PartitionDescriptor partitionDescriptor,
 ProducerDescriptor producerDescriptor);
 
+/**
+ * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+ * given id.
+ *
+ * @param intermediateDataSetID The id of the intermediate data set.
+ * @return all the shuffle descriptors for the partitions in the 
intermediate data set. Null if
+ * not exist.
+ */
+default Collection getClusterPartitionShuffleDescriptors(

Review Comment:
   What's the use case of this method? Why not get cluster partitions from the 
partition tracker directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wsry commented on a diff in pull request #19653: [FLINK-27523] Runtime supports producing and consuming cached intermediate results

2022-05-18 Thread GitBox


wsry commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r876570257


##
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##
@@ -84,6 +86,33 @@ CompletableFuture registerPartitionWithProducer(
 PartitionDescriptor partitionDescriptor,
 ProducerDescriptor producerDescriptor);
 
+/**
+ * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+ * given id.
+ *
+ * @param intermediateDataSetID The id of the intermediate data set.
+ * @return all the shuffle descriptors for the partitions in the 
intermediate data set. Null if
+ * not exist.
+ */
+default Collection getClusterPartitionShuffleDescriptors(
+IntermediateDataSetID intermediateDataSetID) {
+return Collections.emptyList();
+}
+
+/**
+ * Promote the given partition to cluster partition.
+ *
+ * @param shuffleDescriptor The shuffle descriptors of the partition to 
promote.
+ */
+default void promotePartition(ShuffleDescriptor shuffleDescriptor) {}
+
+/**
+ * Remove the given partition from cluster partition.
+ *
+ * @param shuffleDescriptor The shuffle descriptors of the cluster 
partition to be removed.
+ */
+default void removeClusterPartition(ShuffleDescriptor shuffleDescriptor) {}

Review Comment:
   What's the difference of this method and the below one 
(releasePartitionExternally)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27690) Add Pulsar Source connector document and examples

2022-05-18 Thread LuNng Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LuNng Wang updated FLINK-27690:
---
Summary: Add Pulsar Source connector document and examples  (was: Add 
Pulsar Source connector document)

> Add Pulsar Source connector document and examples
> -
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27690) Add Pulsar Source connector document

2022-05-18 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27690:
--

 Summary: Add Pulsar Source connector document
 Key: FLINK-27690
 URL: https://issues.apache.org/jira/browse/FLINK-27690
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27690) Add Pulsar Source connector document

2022-05-18 Thread LuNng Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539193#comment-17539193
 ] 

LuNng Wang commented on FLINK-27690:


I would like to take this. [~dianfu] 

> Add Pulsar Source connector document
> 
>
> Key: FLINK-27690
> URL: https://issues.apache.org/jira/browse/FLINK-27690
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27689) Pulsar Connector support PulsarSchema

2022-05-18 Thread LuNng Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LuNng Wang updated FLINK-27689:
---
Description: 
Currently, Python Pulsar Connector only supports Flink Schema, we also need to 
support Pulsar Schema. 

Note: We need to support the enableSchemaEvolution method as we support 
PulsarSchema.

The following is detail.

[https://github.com/apache/flink/pull/19682#discussion_r872131355]

  was:
Currently, Python Pulsar Connector only supports Flink Schema, we also need to 
support Pulsar Schema. 

Note: We need to support the enableSchemaEvolution method since we support 
PulsarSchema.

The following is detail.

[https://github.com/apache/flink/pull/19682#discussion_r872131355]


> Pulsar Connector support PulsarSchema
> -
>
> Key: FLINK-27689
> URL: https://issues.apache.org/jira/browse/FLINK-27689
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>
> Currently, Python Pulsar Connector only supports Flink Schema, we also need 
> to support Pulsar Schema. 
> Note: We need to support the enableSchemaEvolution method as we support 
> PulsarSchema.
> The following is detail.
> [https://github.com/apache/flink/pull/19682#discussion_r872131355]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27689) Pulsar Connector support PulsarSchema

2022-05-18 Thread LuNng Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LuNng Wang updated FLINK-27689:
---
Description: 
Currently, Python Pulsar Connector only supports Flink Schema, we also need to 
support Pulsar Schema. 

Note: We need to support the enableSchemaEvolution method since we support 
PulsarSchema.

The following is detail.

[https://github.com/apache/flink/pull/19682#discussion_r872131355]

  was:
Currently, Python Pulsar Connector only supports Flink Schema, we also need to 
support Pulsar Schema.

The following is detail.

https://github.com/apache/flink/pull/19682#discussion_r872131355


> Pulsar Connector support PulsarSchema
> -
>
> Key: FLINK-27689
> URL: https://issues.apache.org/jira/browse/FLINK-27689
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Affects Versions: 1.15.0
>Reporter: LuNng Wang
>Priority: Major
>
> Currently, Python Pulsar Connector only supports Flink Schema, we also need 
> to support Pulsar Schema. 
> Note: We need to support the enableSchemaEvolution method since we support 
> PulsarSchema.
> The following is detail.
> [https://github.com/apache/flink/pull/19682#discussion_r872131355]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #226: [FLINK-27645] Update overview / supported features page for 1.0.0

2022-05-18 Thread GitBox


wangyang0918 commented on code in PR #226:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/226#discussion_r876567135


##
docs/content/docs/concepts/overview.md:
##
@@ -30,50 +30,52 @@ Flink Kubernetes Operator acts as a control plane to manage 
the complete deploym
 Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
 
 # Features
-
-| Category   | Feature| Support | Comment  
|
-|||-|--|
-| Kubernetes integration | Flink Native   | full|  
|
-|| Standalone | no  |  
|
-| Deployment Mode| Application Mode   | full|  
|
-|| Session Mode   | limited | no job 
management|
-| Lifecycle Management   | Start Job  | full| empty 
state or from savepoint|
-|| Upgrade Job| full| 
stateless or last-state(chkp/svp)|
-|| Delete Job | full|  
|
-|| Pause/Resume Job   | full|  
|
-|| Savepoint Management   | limited | manual 
savepoint triggering only |
-|| HA | full| via 
flink native k8s HA  |
-|| Validation | full| webhook 
and operator based   |
-| Configuration  | Operator configuration | full| defaults 
and helm values override|
-|| Native Flink properties| full| defaults 
and job level override  |
-|| Environment variables  | full| via pod 
templates|
-|| Native Kubernetes POD settings | full| via pod 
templates|
-| Operations | Installation   | limited | Helm 
based, no public repos used |
-|| UI Access  | limited | domain 
based routing only|
-|| Operator Log Aggregation   | full| k8s 
native and/or custom appender|
-|| Operator Metric Aggregation| limited | basic 
process metrics only   |
-|| Job Logs   | full| k8s 
native and/or custom appender|
-|| Job Metrics| full| k8s 
native and/or custom appender|
-|| K8s Events | limited | 
deployment events only   |
-|| Error Handling and Recovery| limited | 
non-configurable exponential backoff |
-| Pod Augment| Pod Template   | full|  
|
-|| Init containers| full|  
|
-|| Sidecar containers | full|  
|
-|| Layering   | full| jm/tm 
level override |
-| Job Type   | Jar job| full|  
|
-|| SQL Job| no  |  
|
-|| Python Job | no  |  
|
-| CI/CD  | Continuous Integration | full| via 
github actions   |
-|| Public Docker repository   | full| ghcr.io 
/ dockerhub  |
-|| Public Helm repository | full| apache 
release repo  |
-
+## Core
+- Fully-automated [Job Lifecycle 
Management](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/)
+  - Running, suspending and deleting applications
+  - Stateful and stateless application upgrades
+  - Triggering and managing savepoints
+  - Handling errors, rolling-back broken 

[jira] [Created] (FLINK-27689) Pulsar Connector support PulsarSchema

2022-05-18 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27689:
--

 Summary: Pulsar Connector support PulsarSchema
 Key: FLINK-27689
 URL: https://issues.apache.org/jira/browse/FLINK-27689
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang


Currently, Python Pulsar Connector only supports Flink Schema, we also need to 
support Pulsar Schema.

The following is detail.

https://github.com/apache/flink/pull/19682#discussion_r872131355



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


wangyang0918 commented on code in PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#discussion_r876561304


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -245,9 +245,19 @@ public static void deleteJobGraphInKubernetesHA(
 }
 
 public static boolean isHaMetadataAvailable(
-Configuration conf, KubernetesClient kubernetesClient) {
-String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+Configuration observeConfig,
+Configuration deployConfig,
+KubernetesClient kubernetesClient) {
+
+if (!FlinkUtils.isKubernetesHAActivated(observeConfig)

Review Comment:
   It is a hard time for me to understand why we are verifying the HA enabled 
again here. AFAIK, we should already done this in the validator.
   
   After more consideration, I find it is useful in 
`ApplicationReconciler#getAvailableUpgradeMode`. Maybe we should not mix the HA 
enabled verification and HA metadata existence. It will also make some 
logging/exception confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


wangyang0918 commented on PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#issuecomment-1131159361

   > 1. When switching from savepoint -> last-state + ha, we record savpeoint 
upgrade mode which eliminates HA data validation on restore
   
   I think you mean we need to record `savepoint` upgrade mode when switching 
from stateless + non-ha -> last-state + ha here. For savepoint -> last-state + 
ha, we already record `savepoint` upgrade mode in the status. Right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dianfu commented on a diff in pull request #19682: [FLINK-25795][python][connector/pulsar] Add pulsar sink DataStream API

2022-05-18 Thread GitBox


dianfu commented on code in PR #19682:
URL: https://github.com/apache/flink/pull/19682#discussion_r876564037


##
flink-python/pyflink/datastream/connectors.py:
##
@@ -1449,6 +1455,314 @@ def build(self) -> 'PulsarSource':
 return PulsarSource(self._j_pulsar_source_builder.build())
 
 
+class DeliveryGuarantee(Enum):
+"""
+DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest
+delivery guarantee which is supported by your sources and sinks.
+
+:data: `EXACTLY_ONCE`:
+Records are only delivered exactly-once also under failover scenarios. To 
build a complete
+exactly-once pipeline is required that the source and sink support 
exactly-once and are
+properly configured.
+
+:data: `AT_LEAST_ONCE`:
+Records are ensured to be delivered but it may happen that the same record 
is delivered
+multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
+
+:data: `NONE`:
+Records are delivered on a best effort basis. It is often the fastest way 
to process records
+but it may happen that records are lost or duplicated.
+"""
+
+EXACTLY_ONCE = 0,
+AT_LEAST_ONCE = 1,
+NONE = 2
+
+def _to_j_delivery_guarantee(self):
+JDeliveryGuarantee = get_gateway().jvm \
+.org.apache.flink.connector.base.DeliveryGuarantee
+return getattr(JDeliveryGuarantee, self.name)
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \

Review Comment:
   Thanks for the explanation. Could we create a ticket for this? 



##
flink-python/pyflink/datastream/connectors/pulsar.py:
##
@@ -387,3 +409,297 @@ def build(self) -> 'PulsarSource':
 Build the PulsarSource.
 """
 return PulsarSource(self._j_pulsar_source_builder.build())
+
+
+#  PulsarSink 
+
+
+class PulsarSerializationSchema(object):
+"""
+The serialization schema for how to serialize records into Pulsar.
+"""
+
+def __init__(self, _j_pulsar_serialization_schema):
+self._j_pulsar_serialization_schema = _j_pulsar_serialization_schema
+
+@staticmethod
+def flink_schema(serialization_schema: SerializationSchema) \
+-> 'PulsarSerializationSchema':
+"""
+Create a PulsarSerializationSchema by using the flink's 
SerializationSchema. It would
+serialize the message into byte array and send it to Pulsar with 
Schema#BYTES.
+"""
+JPulsarSerializationSchema = get_gateway().jvm.org.apache.flink \
+.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema
+_j_pulsar_serialization_schema = 
JPulsarSerializationSchema.flinkSchema(
+serialization_schema._j_serialization_schema)
+return PulsarSerializationSchema(_j_pulsar_serialization_schema)
+
+
+class TopicRoutingMode(Enum):
+"""
+The routing policy for choosing the desired topic by the given message.
+
+:data: `ROUND_ROBIN`:
+
+The producer will publish messages across all partitions in a round-robin 
fashion to achieve
+maximum throughput. Please note that round-robin is not done per 
individual message but
+rather it's set to the same boundary of batching delay, to ensure batching 
is effective.
+
+:data: `MESSAGE_KEY_HASH`:
+
+If no key is provided, The partitioned producer will randomly pick one 
single topic partition
+and publish all the messages into that partition. If a key is provided on 
the message, the
+partitioned producer will hash the key and assign the message to a 
particular partition.
+
+:data: `CUSTOM`:
+
+Use custom topic router implementation that will be called to determine 
the partition for a
+particular message.
+"""
+
+ROUND_ROBIN = 0
+MESSAGE_KEY_HASH = 1
+CUSTOM = 2
+
+def _to_j_topic_routing_mode(self):
+JTopicRoutingMode = get_gateway().jvm \
+
.org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode
+return getattr(JTopicRoutingMode, self.name)
+
+
+class MessageDelayer(object):
+"""
+A delayer for Pulsar broker passing the sent message to the downstream 
consumer. This is only
+works in :data:`SubscriptionType.Shared` subscription.
+
+Read delayed message delivery
+
https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery
 for better
+understanding this feature.
+"""
+def __init__(self, _j_message_delayer):
+self._j_message_delayer = _j_message_delayer
+
+@staticmethod
+def never() -> 'MessageDelayer':
+"""
+All the messages should be consumed immediately.
+   

[GitHub] [flink] flinkbot commented on pull request #19767: [hotfix] [docs]: Fix docunment mongodb.md

2022-05-18 Thread GitBox


flinkbot commented on PR #19767:
URL: https://github.com/apache/flink/pull/19767#issuecomment-1131154933

   
   ## CI report:
   
   * 6875c9766d9758f69aaf9247e8387d1aa95a658b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JesseAtSZ opened a new pull request, #19767: Fix: Update docunment mongodb.md

2022-05-18 Thread GitBox


JesseAtSZ opened a new pull request, #19767:
URL: https://github.com/apache/flink/pull/19767

   In the official website of MongoDB, DB is capitalized
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #223: [hotfix][FLINK-27572] Harden HA meta checking logic

2022-05-18 Thread GitBox


wangyang0918 commented on code in PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#discussion_r876561303


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -245,9 +245,19 @@ public static void deleteJobGraphInKubernetesHA(
 }
 
 public static boolean isHaMetadataAvailable(
-Configuration conf, KubernetesClient kubernetesClient) {
-String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+Configuration observeConfig,
+Configuration deployConfig,
+KubernetesClient kubernetesClient) {
+
+if (!FlinkUtils.isKubernetesHAActivated(observeConfig)

Review Comment:
   It is a hard time for me to understand why we are verifying the HA enabled 
again here. AFAIK, we should already done this in the validator.
   
   After more consideration, I find it is useful in 
`ApplicationReconciler#getAvailableUpgradeMode`. Maybe we should not mix the HA 
enabled verification and HA metadata existence. It will also make some 
logging/exception confusing.



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -245,9 +245,19 @@ public static void deleteJobGraphInKubernetesHA(
 }
 
 public static boolean isHaMetadataAvailable(
-Configuration conf, KubernetesClient kubernetesClient) {
-String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+Configuration observeConfig,
+Configuration deployConfig,
+KubernetesClient kubernetesClient) {
+
+if (!FlinkUtils.isKubernetesHAActivated(observeConfig)

Review Comment:
   It is a hard time for me to understand why we are verifying the HA enabled 
again here. AFAIK, we should already done this in the validator.
   
   After more consideration, I find it is useful in 
`ApplicationReconciler#getAvailableUpgradeMode`. Maybe we should not mix the HA 
enabled verification and HA metadata existence. It will also make some 
logging/exception confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-18 Thread Zhiwen Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhiwen Sun closed FLINK-27663.
--
Resolution: Not A Problem

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27677) Kubernetes reuse rest.bind-port, but do not support a range of ports

2022-05-18 Thread tartarus (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539180#comment-17539180
 ] 

tartarus commented on FLINK-27677:
--

[~wangyang0918]  thank you for your reply!

The scenarios of k8s and yarn are different. If we configure rest.bind-port as 
a port range, the UT in KubernetesClusterDescriptorTest will fail.

So my suggestion is:
1. Modify the description information of rest.bind-port and add that only fixed 
ports are currently supported in the k8s environment;
2. Add a configuration item for k8s, such as kubernetes.rest.bind-port;
3. KubernetesUtils adds the ability to handle port ranges and select a port 
from them;

Do you have any other suggestions?

> Kubernetes reuse rest.bind-port, but do not support a range of ports
> 
>
> Key: FLINK-27677
> URL: https://issues.apache.org/jira/browse/FLINK-27677
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.15.0
>Reporter: tartarus
>Priority: Major
>
> k8s module reuse the rest options {color:#DE350B}rest.bind-port{color},
> but do not support a range of ports
> {code:java}
>/**
>  * Parse a valid port for the config option. A fixed port is expected, 
> and do not support a
>  * range of ports.
>  *
>  * @param flinkConfig flink config
>  * @param port port config option
>  * @return valid port
>  */
> public static Integer parsePort(Configuration flinkConfig, 
> ConfigOption port) {
> checkNotNull(flinkConfig.get(port), port.key() + " should not be 
> null.");
> try {
> return Integer.parseInt(flinkConfig.get(port));
> } catch (NumberFormatException ex) {
> throw new FlinkRuntimeException(
> port.key()
> + " should be specified to a fixed port. Do not 
> support a range of ports.",
> ex);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] Myasuka commented on pull request #19702: [FLINK-27031][runtime] Assign even empty old state to the task if the…

2022-05-18 Thread GitBox


Myasuka commented on PR #19702:
URL: https://github.com/apache/flink/pull/19702#issuecomment-1131086668

   @akalash since you did not reply in the original ticket, would you please 
explain why this PR could resolve this problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode

2022-05-18 Thread Yufei Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539178#comment-17539178
 ] 

Yufei Zhang commented on FLINK-26541:
-

[~martijnvisser] I think he says "This feature is absolutely necessary, as 
FlinkSQL is becoming more and more populsar, and we saw the strength in 
application mode. It would feel so much better when we combine these two"

> SQL Client should support submitting SQL jobs in application mode
> -
>
> Key: FLINK-26541
> URL: https://issues.apache.org/jira/browse/FLINK-26541
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / 
> Client
>Reporter: Jark Wu
>Priority: Major
>
> Currently, the SQL Client only supports submitting jobs in session mode and 
> per-job mode. As the community going to drop the per-job mode (FLINK-26000), 
> SQL Client should support application mode as well. Otherwise, SQL Client can 
> only submit SQL in session mode then, but streaming jobs should be submitted 
> in per-job or application mode to have bettter resource isolation.
> Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-26541) SQL Client should support submitting SQL jobs in application mode

2022-05-18 Thread Yufei Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539178#comment-17539178
 ] 

Yufei Zhang edited comment on FLINK-26541 at 5/19/22 3:02 AM:
--

[~martijnvisser] I think he/she says "This feature is absolutely necessary, as 
FlinkSQL is becoming more and more populsar, and we saw the strength in 
application mode. It would feel so much better when we combine these two"


was (Author: affe):
[~martijnvisser] I think he says "This feature is absolutely necessary, as 
FlinkSQL is becoming more and more populsar, and we saw the strength in 
application mode. It would feel so much better when we combine these two"

> SQL Client should support submitting SQL jobs in application mode
> -
>
> Key: FLINK-26541
> URL: https://issues.apache.org/jira/browse/FLINK-26541
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Deployment / YARN, Table SQL / 
> Client
>Reporter: Jark Wu
>Priority: Major
>
> Currently, the SQL Client only supports submitting jobs in session mode and 
> per-job mode. As the community going to drop the per-job mode (FLINK-26000), 
> SQL Client should support application mode as well. Otherwise, SQL Client can 
> only submit SQL in session mode then, but streaming jobs should be submitted 
> in per-job or application mode to have bettter resource isolation.
> Disucssions: https://lists.apache.org/thread/2yq351nb721x23rz1q8qlyf2tqrk147r



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27663) upsert-kafka can't process delete message from upsert-kafka sink

2022-05-18 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539173#comment-17539173
 ] 

Qingsheng Ren commented on FLINK-27663:
---

[~pensz] 
[KIP-810|https://cwiki.apache.org/confluence/display/KAFKA/KIP-810%3A+Allow+producing+records+with+null+values+in+Kafka+Console+Producer]
 introduces new property in Kafka console producer to support producing null 
values, but it's only available from Kafka 3.2, so I'm afraid you have to write 
Java code to create a {{ProducerRecord}} with null value and produce into Kafka 
with {{KafkaProducer}} if you are using Kafka <3.2.

> upsert-kafka can't process delete message from upsert-kafka sink
> 
>
> Key: FLINK-27663
> URL: https://issues.apache.org/jira/browse/FLINK-27663
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.13.6, 1.14.4
>Reporter: Zhiwen Sun
>Priority: Major
>
> upsert-kafka write DELETE data as Kafka messages with null values (indicate 
> tombstone for the key).
> But when use upsert-kafka as a source table to consumer kafka messages write 
> by upsert-kafka sink, DELETE messages will be ignored.
>  
> related sql :
>  
>  
> {code:java}
> create table order_system_log(
>   id bigint,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'test_use',
>  'properties.bootstrap.servers' = 'your broker',
>  'properties.group.id' = 'your group id',
>  'value.json.fail-on-missing-field' = 'false',
>  'value.json.ignore-parse-errors' = 'true',
>  'key.json.fail-on-missing-field' = 'false',
>  'key.json.ignore-parse-errors' = 'true',
>  'key.format' = 'json',
>  'value.format' = 'json'
> );
> select
> *
> from
> order_system_log
> ;
> {code}
>  
>  
> The problem may be produced by DeserializationSchema#deserialize,
> this method does not collect data while subclass's deserialize return null.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876530596


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##
@@ -335,6 +379,49 @@ public void checkpointState(
 }
 }
 
+private void registerAlignmentTimer(
+long checkpointId,
+OperatorChain operatorChain,
+CheckpointBarrier checkpointBarrier) {
+synchronized (lock) {
+resetAlignmentTimer(checkpointId);
+if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
+return;
+}
+
+long timerDelay = BarrierAlignmentUtil.getTimerDelay(clock, 
checkpointBarrier);
+
+Cancellable currentAlignmentTimer =
+registerTimer.apply(
+() -> {
+try {
+
operatorChain.alignedBarrierTimeout(checkpointId);
+} catch (Exception e) {
+ExceptionUtils.rethrowIOException(e);
+}
+synchronized (lock) {
+alignmentTimers.remove(checkpointId);
+}
+return null;
+},
+Duration.ofMillis(timerDelay));

Review Comment:
   Same as why the `NavigableMap alignmentTimers` need to be 
guarded by lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876530327


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##
@@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 @GuardedBy("lock")
 private boolean closed;
 
+private final BiFunction, Duration, Cancellable> registerTimer;
+
+private final Clock clock;
+
+/** Hold the AlignmentTimer for each checkpointId. */
+@GuardedBy("lock")

Review Comment:
   The close method will call the resetAlignmentTimer, it's a async thread, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27683) Insert into (column1, column2) Values(.....) can't work with sql Hints

2022-05-18 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-27683:
--
Component/s: Table SQL / Planner
 (was: Table SQL / Runtime)

> Insert into (column1, column2) Values(.) can't work with sql Hints
> --
>
> Key: FLINK-27683
> URL: https://issues.apache.org/jira/browse/FLINK-27683
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: Xin Yang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> When I try to use statement `Insert into (column1, column2) Values(.)` 
> with SQL hints, it throw some exception, which is certainly a bug.
>  
>  * Sql 1
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` /*+ OPTIONS('tidb.sink.update-columns'='c2, 
> c13')*/   (c2, c13) values(1, 12.12) {code}
>  * 
>  ** result 1
> !screenshot-1.png!
>  * Sql 2
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13) /*+ 
> OPTIONS('tidb.sink.update-columns'='c2, c13')*/  values(1, 12.12)
> {code}
>  * 
>  ** result 2
> !screenshot-2.png!
>  * Sql 3
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13)  values(1, 12.12)
> {code}
>  * 
>  ** result3 : success



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27683) Insert into (column1, column2) Values(.....) can't work with sql Hints

2022-05-18 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-27683:
--
Affects Version/s: 1.14.4

> Insert into (column1, column2) Values(.) can't work with sql Hints
> --
>
> Key: FLINK-27683
> URL: https://issues.apache.org/jira/browse/FLINK-27683
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.14.4
>Reporter: Xin Yang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> When I try to use statement `Insert into (column1, column2) Values(.)` 
> with SQL hints, it throw some exception, which is certainly a bug.
>  
>  * Sql 1
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` /*+ OPTIONS('tidb.sink.update-columns'='c2, 
> c13')*/   (c2, c13) values(1, 12.12) {code}
>  * 
>  ** result 1
> !screenshot-1.png!
>  * Sql 2
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13) /*+ 
> OPTIONS('tidb.sink.update-columns'='c2, c13')*/  values(1, 12.12)
> {code}
>  * 
>  ** result 2
> !screenshot-2.png!
>  * Sql 3
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13)  values(1, 12.12)
> {code}
>  * 
>  ** result3 : success



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27683) Insert into (column1, column2) Values(.....) can't work with sql Hints

2022-05-18 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-27683:
--
Affects Version/s: 1.16.0
   1.15.1
   (was: 1.14.4)

> Insert into (column1, column2) Values(.) can't work with sql Hints
> --
>
> Key: FLINK-27683
> URL: https://issues.apache.org/jira/browse/FLINK-27683
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0, 1.16.0, 1.15.1
>Reporter: Xin Yang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> When I try to use statement `Insert into (column1, column2) Values(.)` 
> with SQL hints, it throw some exception, which is certainly a bug.
>  
>  * Sql 1
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` /*+ OPTIONS('tidb.sink.update-columns'='c2, 
> c13')*/   (c2, c13) values(1, 12.12) {code}
>  * 
>  ** result 1
> !screenshot-1.png!
>  * Sql 2
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13) /*+ 
> OPTIONS('tidb.sink.update-columns'='c2, c13')*/  values(1, 12.12)
> {code}
>  * 
>  ** result 2
> !screenshot-2.png!
>  * Sql 3
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13)  values(1, 12.12)
> {code}
>  * 
>  ** result3 : success



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27683) Insert into (column1, column2) Values(.....) can't work with sql Hints

2022-05-18 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539167#comment-17539167
 ] 

Shengkai Fang commented on FLINK-27683:
---

The main reason is that it tries to cast the SqlTabelRef to SqlIdentifier. 

> Insert into (column1, column2) Values(.) can't work with sql Hints
> --
>
> Key: FLINK-27683
> URL: https://issues.apache.org/jira/browse/FLINK-27683
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Xin Yang
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> When I try to use statement `Insert into (column1, column2) Values(.)` 
> with SQL hints, it throw some exception, which is certainly a bug.
>  
>  * Sql 1
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` /*+ OPTIONS('tidb.sink.update-columns'='c2, 
> c13')*/   (c2, c13) values(1, 12.12) {code}
>  * 
>  ** result 1
> !screenshot-1.png!
>  * Sql 2
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13) /*+ 
> OPTIONS('tidb.sink.update-columns'='c2, c13')*/  values(1, 12.12)
> {code}
>  * 
>  ** result 2
> !screenshot-2.png!
>  * Sql 3
> {code:java}
> INSERT INTO `tidb`.`%s`.`%s` (c2, c13)  values(1, 12.12)
> {code}
>  * 
>  ** result3 : success



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876527397


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##
@@ -103,6 +112,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 @GuardedBy("lock")
 private boolean closed;
 
+private final BiFunction, Duration, Cancellable> registerTimer;
+
+private final Clock clock;
+
+/** Hold the AlignmentTimer for each checkpointId. */
+@GuardedBy("lock")
+private final NavigableMap alignmentTimers;

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

2022-05-18 Thread GitBox


tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r876524803


##
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+@Override
+protected List ddl() {
+return Collections.singletonList(
+"CREATE TABLE IF NOT EXISTS T3 ( "
++ " a STRING, "
++ " b INT, "
++ " c INT, "
++ " PRIMARY KEY (a) NOT ENFORCED )"
++ " WITH ("
++ " 'merge-engine'='aggregation' ,"
++ " 'b.aggregate-function'='sum' ,"
++ " 'c.aggregate-function'='sum' "

Review Comment:
   If users don't care about the results of some columns they shouldn't include 
them into the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876523317


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
 inflightBuffers.toArray(new Buffer[0]));
 }
 }
-return numPriorityElements == 1
-&& !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-// notifications
+return needNotifyPriorityEvent();
+}
+
+// It just be called after add priorityEvent.
+private boolean needNotifyPriorityEvent() {
+assert Thread.holdsLock(buffers);
+// if subpartition is blocked then downstream doesn't expect any 
notifications
+return buffers.getNumPriorityElements() == 1 && !isBlocked;
+}
+
+private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+checkState(
+!channelStateFutures.containsKey(barrier.getId()),
+"%s has received the checkpoint barrier %d, it maybe a bug.",
+toString(),
+barrier.getId());
+
+checkChannelStateFutures(barrier.getId());
+CompletableFuture> dataFuture = new CompletableFuture<>();
+channelStateFutures.put(barrier.getId(), dataFuture);
+channelStateWriter.addOutputDataFuture(
+barrier.getId(),
+subpartitionInfo,
+ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+dataFuture);
+}
+
+private void checkChannelStateFutures(long currentCheckpointId) {
+assert Thread.holdsLock(buffers);
+
+while (!channelStateFutures.isEmpty()) {
+Long checkpointId = channelStateFutures.firstKey();
+if (checkpointId >= currentCheckpointId) {
+break;
+}
+String exceptionMessage =
+String.format(
+"Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
++ "future by exception due to currently 
does not support "
++ "concurrent unaligned checkpoints.",
+currentCheckpointId, checkpointId);
+channelStateFutures
+.pollFirstEntry()
+.getValue()
+.completeExceptionally(new 
IllegalStateException(exceptionMessage));
+LOG.info(exceptionMessage);
+}
+}
+
+private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+if (channelStateFutures.isEmpty()) {
+return;
+}
+CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+
+CompletableFuture> channelStateFuture =
+channelStateFutures.remove(barrier.getId());
+if (channelStateFuture == null) {
+return;
+}
+channelStateFuture.complete(null);
+}
+
+private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+BufferConsumer bufferConsumer) {
+CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+checkState(
+barrier.getCheckpointOptions().isTimeoutable()
+&& 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+== bufferConsumer.getDataType());
+return barrier;
+}
+
+@Override
+public void alignedBarrierTimeout(long checkpointId) throws IOException {
+int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+synchronized (buffers) {
+CompletableFuture> channelStateFuture =
+channelStateFutures.remove(checkpointId);
+// The checkpoint barrier has sent to downstream, so nothing to do.
+if (channelStateFuture == null) {
+return;
+}
+
+// 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+List inflightBuffers = new ArrayList<>();
+try {
+if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+prioritySequenceNumber = sequenceNumber;
+}
+} catch (IOException e) {
+channelStateFuture.completeExceptionally(
+new IllegalStateException(

Review Comment:
   I wrapped some messages and e to the IllegalStateException. Do you mean that 
`completeExceptionally(e)` directly?



-- 
This is an automated 

[jira] [Closed] (FLINK-27652) CompactManager.Rewriter cannot handle different partition keys invoked compaction

2022-05-18 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-27652.

Resolution: Fixed

master: bc46116bef59be033db8b85655e53e479113f462

> CompactManager.Rewriter cannot handle different partition keys invoked 
> compaction
> -
>
> Key: FLINK-27652
> URL: https://issues.apache.org/jira/browse/FLINK-27652
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> h3. Issue Description
> When enabling {{commit.force-compact}} for the partitioned managed table, 
> there had a chance that the successive synchronized
> writes got failure.  The current impl of {{CompactManager.Rewriter}} is an 
> anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and 
> {{bucket}} are referenced as local variables, and the dataFileReader is 
> initiliazed when rewrite is called; and this may lead to the {{rewrite}} 
> method messing up with the wrong data file with the {{partition}} and 
> {{bucket}}.
> h3. Root Cause
> {code:java}
> Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: java.io.FileNotFoundException: File 
> file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
>  does not exist or the user running Flink ('jane.cjm') has insufficient 
> permissions to access it. at 
> org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
> {code}
> However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
> partition Autumn. It seems like the rewriter found the wrong partition/bucket 
> with the wrong file.
> h3. How to Reproduce
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.store.connector;
> import org.junit.Test;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.ExecutionException;
> /** A reproducible case. */
> public class ForceCompactionITCase extends FileStoreTableITCase {
> @Override
> protected List ddl() {
> return Collections.singletonList(
> "CREATE TABLE IF NOT EXISTS T1 ("
> + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY 
> (f1)");
> }
> @Test
> public void test() throws ExecutionException, InterruptedException {
> bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
> bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 
> 'true')");
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is 
> Coming')"
> + ",(2, 'Winter', 'The First Snowflake'), "
> + "(2, 'Spring', 'The First Rose in Spring'), 
> "
> + "(7, 'Summer', 'Summertime Sadness')")
> .await();
> bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
> Christmas')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
> Coming')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
> 'Refrain')").await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
> Sugar'), "
> + "(4, 'Spring', 'Spring Water')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
> .await();
> bEnv.executeSql(
> "INSERT 

[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876521519


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer 
bufferConsumer, int partial
 inflightBuffers.toArray(new Buffer[0]));
 }
 }
-return numPriorityElements == 1
-&& !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-// notifications
+return needNotifyPriorityEvent();
+}
+
+// It just be called after add priorityEvent.
+private boolean needNotifyPriorityEvent() {
+assert Thread.holdsLock(buffers);
+// if subpartition is blocked then downstream doesn't expect any 
notifications
+return buffers.getNumPriorityElements() == 1 && !isBlocked;
+}
+
+private void receiveTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+checkState(
+!channelStateFutures.containsKey(barrier.getId()),
+"%s has received the checkpoint barrier %d, it maybe a bug.",
+toString(),
+barrier.getId());
+
+checkChannelStateFutures(barrier.getId());
+CompletableFuture> dataFuture = new CompletableFuture<>();
+channelStateFutures.put(barrier.getId(), dataFuture);
+channelStateWriter.addOutputDataFuture(
+barrier.getId(),
+subpartitionInfo,
+ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+dataFuture);
+}
+
+private void checkChannelStateFutures(long currentCheckpointId) {
+assert Thread.holdsLock(buffers);
+
+while (!channelStateFutures.isEmpty()) {
+Long checkpointId = channelStateFutures.firstKey();
+if (checkpointId >= currentCheckpointId) {
+break;
+}
+String exceptionMessage =
+String.format(
+"Received the barrier of checkpointId=%d, complete 
checkpointId=%d "
++ "future by exception due to currently 
does not support "
++ "concurrent unaligned checkpoints.",
+currentCheckpointId, checkpointId);
+channelStateFutures
+.pollFirstEntry()
+.getValue()
+.completeExceptionally(new 
IllegalStateException(exceptionMessage));
+LOG.info(exceptionMessage);
+}
+}

Review Comment:
   You are right. I use `TreeMap` due to 
`ChannelStateWriteRequestDispatcherImpl` uses `Map writers;`. Currently, Flink does not support 
concurrent unaligned checkpoints. I guess  it might be supported in the future.
   
   Please help to double check, if you think it doesn't need to be considered, 
I can simplify TreeMap to single CompletableFuture<...> channelStateFuture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27652) CompactManager.Rewriter cannot handle different partition keys invoked compaction

2022-05-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27652:
---
Labels: pull-request-available  (was: )

> CompactManager.Rewriter cannot handle different partition keys invoked 
> compaction
> -
>
> Key: FLINK-27652
> URL: https://issues.apache.org/jira/browse/FLINK-27652
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.2.0
>
>
> h3. Issue Description
> When enabling {{commit.force-compact}} for the partitioned managed table, 
> there had a chance that the successive synchronized
> writes got failure.  The current impl of {{CompactManager.Rewriter}} is an 
> anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and 
> {{bucket}} are referenced as local variables, and the dataFileReader is 
> initiliazed when rewrite is called; and this may lead to the {{rewrite}} 
> method messing up with the wrong data file with the {{partition}} and 
> {{bucket}}.
> h3. Root Cause
> {code:java}
> Caused by: java.io.IOException: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: java.io.FileNotFoundException: File 
> file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0
>  does not exist or the user running Flink ('jane.cjm') has insufficient 
> permissions to access it. at 
> org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172)
> {code}
> However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to 
> partition Autumn. It seems like the rewriter found the wrong partition/bucket 
> with the wrong file.
> h3. How to Reproduce
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.flink.table.store.connector;
> import org.junit.Test;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.ExecutionException;
> /** A reproducible case. */
> public class ForceCompactionITCase extends FileStoreTableITCase {
> @Override
> protected List ddl() {
> return Collections.singletonList(
> "CREATE TABLE IF NOT EXISTS T1 ("
> + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY 
> (f1)");
> }
> @Test
> public void test() throws ExecutionException, InterruptedException {
> bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')");
> bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 
> 'true')");
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is 
> Coming')"
> + ",(2, 'Winter', 'The First Snowflake'), "
> + "(2, 'Spring', 'The First Rose in Spring'), 
> "
> + "(7, 'Summer', 'Summertime Sadness')")
> .await();
> bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
> Christmas')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
> Coming')").await();
> bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 
> 'Refrain')").await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon 
> Sugar'), "
> + "(4, 'Spring', 'Spring Water')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
> + " (9, 'Autumn', 'Wake Me Up When September 
> Ends')")
> .await();
> bEnv.executeSql(
> "INSERT INTO T1 VALUES(666, 

[GitHub] [flink-table-store] JingsongLi merged pull request #127: [FLINK-27652] Fix CompactManager.Rewriter cannot handle different partition keys invoked compaction

2022-05-18 Thread GitBox


JingsongLi merged PR #127:
URL: https://github.com/apache/flink-table-store/pull/127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs commented on pull request #19746: [FLINK-27630][table-planner] Add maven-source-plugin for table planne…

2022-05-18 Thread GitBox


liuyongvs commented on PR #19746:
URL: https://github.com/apache/flink/pull/19746#issuecomment-113076

   hi @wuchong , ci passed, could you please review it ,thanks so much 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876518129


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##
@@ -191,9 +205,7 @@ private int add(BufferConsumer bufferConsumer, int 
partialRecordLength, boolean
 newBufferSize = bufferSize;
 }
 
-if (prioritySequenceNumber != -1) {
-notifyPriorityEvent(prioritySequenceNumber);
-}
+notifyPriorityEvent(prioritySequenceNumber);

Review Comment:
   Thanks for your reminder, I will do better in the future contribution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876516180


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##
@@ -72,6 +74,51 @@ static ChannelStateWriteRequest write(
 (writer, buffer) -> writer.writeOutput(info, buffer));
 }
 
+static ChannelStateWriteRequest write(
+long checkpointId,
+ResultSubpartitionInfo info,
+CompletableFuture> dataFuture) {
+return buildFutureWriteRequest(
+checkpointId,
+"writeOutputFuture",
+dataFuture,
+(writer, buffer) -> writer.writeOutput(info, buffer));
+}
+
+static ChannelStateWriteRequest buildFutureWriteRequest(
+long checkpointId,
+String name,
+CompletableFuture> dataFuture,
+BiConsumer bufferConsumer) {
+return new CheckpointInProgressRequest(
+name,
+checkpointId,
+writer -> {
+try {
+List buffers = dataFuture.get();
+if (buffers == null || buffers.isEmpty()) {
+return;
+}
+for (Buffer buffer : buffers) {
+checkBufferIsBuffer(buffer);
+bufferConsumer.accept(writer, buffer);
+}
+} catch (Throwable e) {
+writer.fail(e);
+}
+},
+throwable -> {
+List buffers = dataFuture.get();
+if (buffers == null || buffers.isEmpty()) {
+return;
+}

Review Comment:
   Same as the previous one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #19723: [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier in the output buffers

2022-05-18 Thread GitBox


1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876515484


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##
@@ -72,6 +74,51 @@ static ChannelStateWriteRequest write(
 (writer, buffer) -> writer.writeOutput(info, buffer));
 }
 
+static ChannelStateWriteRequest write(
+long checkpointId,
+ResultSubpartitionInfo info,
+CompletableFuture> dataFuture) {
+return buildFutureWriteRequest(
+checkpointId,
+"writeOutputFuture",
+dataFuture,
+(writer, buffer) -> writer.writeOutput(info, buffer));
+}
+
+static ChannelStateWriteRequest buildFutureWriteRequest(
+long checkpointId,
+String name,
+CompletableFuture> dataFuture,
+BiConsumer bufferConsumer) {
+return new CheckpointInProgressRequest(
+name,
+checkpointId,
+writer -> {
+try {
+List buffers = dataFuture.get();
+if (buffers == null || buffers.isEmpty()) {

Review Comment:
   The barrier will be send to downstream quickly when the back pressure isn't 
severe. For this case, I execute `channelStateFuture.complete(null)` in 
`PipelinedSubpartition#completeTimeoutableCheckpointBarrier` to reduce create 
some useless Objects(EmptyLists). Do you think it should be changed to 
`channelStateFuture.complete(EmptyList)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #19759: [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions

2022-05-18 Thread GitBox


lincoln-lil commented on PR #19759:
URL: https://github.com/apache/flink/pull/19759#issuecomment-1130907243

   @flinkbot  run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-18 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r876452544


##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##
@@ -1137,8 +1138,9 @@ public void emitWatermark(Watermark mark) {
 expectedResults.add(new Watermark(-4));
 // verify watermark
 awaitRecordCount(results, expectedResults.size());
-assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
-assertEquals(0, TestWatermarkTracker.WATERMARK.get());
+assertThat(results)
+
.satisfies(matching(org.hamcrest.Matchers.contains(expectedResults.toArray(;

Review Comment:
   Use native assertj contains?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##
@@ -1213,7 +1216,8 @@ private static class OpenCheckingStringSchema extends 
SimpleStringSchema {
 
 @Override
 public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
-assertThat(context.getMetricGroup(), 
notNullValue(MetricGroup.class));
+assertThat(context.getMetricGroup())
+.satisfies(matching(notNullValue(MetricGroup.class)));

Review Comment:
   Is there anything that prevents a direct assertj not null check?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java:
##
@@ -984,9 +981,13 @@ public void markAsTemporarilyIdle() {}
 sourceFunc.cancel();
 testHarness.close();
 
-assertEquals("record count", recordCount, 
testHarness.getOutput().size());
-assertThat(watermarks, org.hamcrest.Matchers.contains(new 
Watermark(-3), new Watermark(5)));
-assertEquals("watermark count", watermarkCount, watermarks.size());
+assertThat(testHarness.getOutput()).as("record 
count").hasSize(recordCount);
+assertThat(watermarks)
+.satisfies(
+matching(

Review Comment:
   Use native assertj contains?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java:
##
@@ -175,15 +171,14 @@ public void testAsyncErrorRethrownOnCheckpoint() throws 
Throwable {
 testHarness.snapshot(123L, 123L);
 } catch (Exception e) {
 // the next checkpoint should rethrow the async exception
-Assert.assertTrue(
-ExceptionUtils.findThrowableWithMessage(e, "artificial 
async exception")
-.isPresent());
+assertThat(ExceptionUtils.findThrowableWithMessage(e, "artificial 
async exception"))

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java:
##
@@ -210,11 +208,12 @@ public void testGetShardList() throws Exception {
 expectedStreamShard.add(shardHandle);
 }
 
-Assert.assertThat(
-actualShardList,
-containsInAnyOrder(
-expectedStreamShard.toArray(
-new 
StreamShardHandle[actualShardList.size()])));
+assertThat(actualShardList)
+.satisfies(

Review Comment:
   Use native assertj checks?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java:
##
@@ -120,15 +122,17 @@ public void testRetainMinAfterReachingLimit() throws 
Exception {
 while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
 Thread.sleep(10);
 }
-Assert.assertThat(emitter.results, Matchers.contains(one, two, 
three, ten));
+assertThat(emitter.results)
+.satisfies(matching(Matchers.contains(one, two, three, 
ten)));
 
 // advance watermark, emits remaining record from queue0
 emitter.setCurrentWatermark(10);
 dl = Deadline.fromNow(Duration.ofSeconds(10));
 while (emitter.results.size() != 5 && dl.hasTimeLeft()) {
 Thread.sleep(10);
 }
-Assert.assertThat(emitter.results, Matchers.contains(one, two, 
three, ten, eleven));
+assertThat(emitter.results)

Review Comment:
   Use native assertj checks?



##
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java:
##
@@ -235,16 +230,16 @@ public void go() throws Exception {
 snapshotThread.sync();
 } catch (Exception e) {
 // after the flush, the async 

[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-18 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r876439486


##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##
@@ -247,12 +249,18 @@ public void 
testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throw
 DeliveryGuarantee.EXACTLY_ONCE,
 2,
 (records) ->
-assertThat(
-records,
-contains(
-LongStream.range(1, 
lastCheckpointedRecord.get().get() + 1)
-.boxed()
-.toArray(;
+assertThat(records)
+.satisfies(

Review Comment:
   Use contains directly?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##
@@ -286,12 +293,13 @@ public void 
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, "newPrefix");
 final List> collectedRecords =
 drainAllRecordsFromTopic(topic, true);
-assertThat(
-deserializeValues(collectedRecords),
-contains(
-LongStream.range(1, lastCheckpointedRecord.get().get() 
+ 1)
-.boxed()
-.toArray()));
+assertThat(deserializeValues(collectedRecords))
+.satisfies(

Review Comment:
   Use contains directly?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##
@@ -165,13 +167,13 @@ public void testAsyncErrorRethrownOnInvoke() throws 
Throwable {
 testHarness.processElement(new StreamRecord<>("msg-2"));
 } catch (Exception e) {
 // the next invoke should rethrow the async exception
-Assert.assertTrue(e.getCause().getMessage().contains("artificial 
async exception"));
+assertThat(e.getCause().getMessage()).contains("artificial async 
exception");

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java:
##
@@ -679,10 +666,9 @@ private void testFailingConsumerLifecycle(
 "Exception should have been thrown from open / run method 
of FlinkKafkaConsumerBase.");
 } catch (Exception e) {
 assertThat(
-ExceptionUtils.findThrowable(
-e, throwable -> 
throwable.equals(expectedException))
-.isPresent(),
-is(true));
+ExceptionUtils.findThrowable(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##
@@ -227,68 +225,4 @@ public TypeInformation getProducedType() {
 return Types.STRING;
 }
 }
-

Review Comment:
   Why are those not needed anymore?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##
@@ -395,11 +393,14 @@ void testAssigningEmptySplits() throws Exception {
 new TestingReaderOutput<>(),
 () -> reader.getNumAliveFetchers() == 0,
 "The split fetcher did not exit before timeout.");
-MatcherAssert.assertThat(
-finishedSplits,
-Matchers.containsInAnyOrder(
-
KafkaPartitionSplit.toSplitId(normalSplit.getTopicPartition()),
-
KafkaPartitionSplit.toSplitId(emptySplit.getTopicPartition(;
+assertThat(finishedSplits)
+.satisfies(

Review Comment:
   Use assertj contains* directly?



##
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##
@@ -408,10 +416,11 @@ private void writeRecordsToKafka(
 drainAllRecordsFromTopic(
 topic, deliveryGuarantee == 
DeliveryGuarantee.EXACTLY_ONCE);
 final long recordsCount = expectedRecords.get().get();
-assertEquals(collectedRecords.size(), recordsCount);
-assertThat(
-deserializeValues(collectedRecords),
-contains(LongStream.range(1, recordsCount + 
1).boxed().toArray()));
+assertThat(recordsCount).isEqualTo(collectedRecords.size());

Review Comment:

[GitHub] [flink] afedulov commented on a diff in pull request #19660: [FLINK-27185][connectors] Convert connector modules to assertj

2022-05-18 Thread GitBox


afedulov commented on code in PR #19660:
URL: https://github.com/apache/flink/pull/19660#discussion_r876433275


##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java:
##
@@ -116,10 +116,10 @@ public void testBatchNumTooLarge() {
 }
 
 private void check(long[][] expected, Serializable[][] actual) {
-assertEquals(expected.length, actual.length);
+assertThat(actual).hasDimensions(expected.length, 2);

Review Comment:
   Could that be `hasDimensions(expected.length, expected[0].length);` ?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -282,15 +281,15 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -367,11 +366,11 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -381,11 +380,11 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -264,12 +263,12 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(
+ExceptionUtils.findThrowableWithMessage(
 t,
 "Either all or none of the following 
options should be provided:\n"
-+ "username\npassword")
-.isPresent());
++ "username\npassword"))

Review Comment:
   assertThatThrownBy ?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -304,12 +303,12 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(
+ExceptionUtils.findThrowableWithMessage(
 t,
 "'scan.partition.lower-bound'='100' must 
not be larger than "
-+ 
"'scan.partition.upper-bound'='-10'.")
-.isPresent());
++ 
"'scan.partition.upper-bound'='-10'."))

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -337,13 +336,13 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(
-ExceptionUtils.findThrowableWithMessage(
+assertThat(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -320,13 +319,13 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, properties);
 fail("exception expected");
 } catch (Throwable t) {
-assertTrue(

Review Comment:
   assertThatThrownBy?



##
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java:
##
@@ -304,12 +303,12 @@ public void testJdbcValidation() {
 createTableSource(SCHEMA, 

[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-18 Thread GitBox


snuyanzin commented on code in PR #19753:
URL: https://github.com/apache/flink/pull/19753#discussion_r876422680


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/ComparableInputTypeStrategyTest.java:
##
@@ -33,24 +33,30 @@
 import org.apache.flink.table.types.logical.StructuredType;
 import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nonnull;
 
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
-import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 
 /** Tests for {@link ComparableTypeStrategy}. */
-public class ComparableInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
+class ComparableInputTypeStrategyTest extends InputTypeStrategiesTestBase {
 
-@Parameterized.Parameters(name = "{index}: {0}")
-public static List testData() {
-return asList(
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource("testData")
+protected void testStrategy(TestSpec testSpec) {
+super.testStrategy(testSpec);
+}

Review Comment:
   This allows to run parameterized tests with parameters generated in successor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-18 Thread GitBox


snuyanzin commented on code in PR #19753:
URL: https://github.com/apache/flink/pull/19753#discussion_r876419257


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTestBase.java:
##
@@ -42,26 +40,36 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for tests of {@link TypeStrategies}. */
-@RunWith(Parameterized.class)
 public abstract class TypeStrategiesTestBase {
 
-@Parameter public TestSpec testSpec;
-
-@Test
-public void testTypeStrategy() {
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource(
+value = {
+
"org.apache.flink.table.types.inference.strategies.ArrayTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.CurrentWatermarkTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.DecimalTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.GetTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.MapTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.MappingTypeStrategiesTest#testData",
+
"org.apache.flink.table.types.inference.strategies.RowTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.StringConcatTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.TypeStrategiesTest#testData"
+})

Review Comment:
   seems resolved with test overriding



##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java:
##
@@ -45,39 +44,49 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for testing {@link InputTypeStrategy}. */
-@RunWith(Parameterized.class)
 public abstract class InputTypeStrategiesTestBase {
 
-@Parameterized.Parameter public TestSpec testSpec;
-
-@Test
-public void testStrategy() {
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource(
+value = {
+
"org.apache.flink.table.types.inference.ComparableInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.CurrentWatermarkInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.InputTypeStrategiesTest#testData",
+
"org.apache.flink.table.types.inference.strategies.RepeatingSequenceInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.SubsequenceInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.SymbolArgumentTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.TypeLiteralArgumentTypeStrategyTest#testData"
+})

Review Comment:
   seems resolved with test overriding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-3154) Update Kryo version from 2.24.0 to 5.2.0

2022-05-18 Thread Himanshu Shah (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539097#comment-17539097
 ] 

Himanshu Shah commented on FLINK-3154:
--

An open vulnerability due to no class whitelisting in versions prior to kryo 
5.0.0 is flagged up a a security risk for flink.

[https://github.com/EsotericSoftware/kryo/issues/398]

Is there a plan to upgrade to a newer version of kryo to remediate?

> Update Kryo version from 2.24.0 to 5.2.0
> 
>
> Key: FLINK-3154
> URL: https://issues.apache.org/jira/browse/FLINK-3154
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Priority: Not a Priority
>
> Flink's Kryo version is outdated and could be updated to a newer version, 
> e.g. kryo-3.0.3.
> From ML: we cannot bumping the Kryo version easily - the serialization format 
> changed (that's why they have a new major version), which would render all 
> Flink savepoints and checkpoints incompatible.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] snuyanzin commented on pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-18 Thread GitBox


snuyanzin commented on PR #19753:
URL: https://github.com/apache/flink/pull/19753#issuecomment-1130513388

   I converted since to draft since I noticed an issue with tests 
`InputTypeStrategiesTestBase` and `TypeStrategiesTestBase` mentioned above


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-18 Thread GitBox


snuyanzin commented on code in PR #19753:
URL: https://github.com/apache/flink/pull/19753#discussion_r876347664


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTestBase.java:
##
@@ -42,26 +40,36 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for tests of {@link TypeStrategies}. */
-@RunWith(Parameterized.class)
 public abstract class TypeStrategiesTestBase {
 
-@Parameter public TestSpec testSpec;
-
-@Test
-public void testTypeStrategy() {
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource(
+value = {
+
"org.apache.flink.table.types.inference.strategies.ArrayTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.CurrentWatermarkTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.DecimalTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.GetTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.MapTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.MappingTypeStrategiesTest#testData",
+
"org.apache.flink.table.types.inference.strategies.RowTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.StringConcatTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.TypeStrategiesTest#testData"
+})

Review Comment:
   It leads to cartesian of tests and input
   need to think how to cope with this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on a diff in pull request #19753: [FLINK-27672][tests][table] Migrate flink-table-common to JUnit5

2022-05-18 Thread GitBox


snuyanzin commented on code in PR #19753:
URL: https://github.com/apache/flink/pull/19753#discussion_r876347233


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java:
##
@@ -45,39 +44,49 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for testing {@link InputTypeStrategy}. */
-@RunWith(Parameterized.class)
 public abstract class InputTypeStrategiesTestBase {
 
-@Parameterized.Parameter public TestSpec testSpec;
-
-@Test
-public void testStrategy() {
+@ParameterizedTest(name = "{index}: {0}")
+@MethodSource(
+value = {
+
"org.apache.flink.table.types.inference.ComparableInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.CurrentWatermarkInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.InputTypeStrategiesTest#testData",
+
"org.apache.flink.table.types.inference.strategies.RepeatingSequenceInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.SubsequenceInputTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.SymbolArgumentTypeStrategyTest#testData",
+
"org.apache.flink.table.types.inference.strategies.TypeLiteralArgumentTypeStrategyTest#testData"
+})

Review Comment:
   It leads to cartesian of tests and input
   need to think how to cope with this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-26914) Notice files improvements in flink-kubernetes-operator

2022-05-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-26914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-26914.
--
Resolution: Fixed

Merged to main via 
[{{d66077d}}|https://github.com/apache/flink-kubernetes-operator/commit/d66077dcbba2911bdd22b325f5ada7e92c71b016]
 and to release-1.0 via 
[f7eed3a|https://github.com/apache/flink-kubernetes-operator/commit/f7eed3a1029e74bc12edb0bcf41cd7bada823dbe].

> Notice files improvements in flink-kubernetes-operator
> --
>
> Key: FLINK-26914
> URL: https://issues.apache.org/jira/browse/FLINK-26914
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Yang Wang
>Assignee: Yang Wang
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: kubernetes-operator-1.0.0
>
>
> See the discussion in this PR[1].
> Having the jars contain the NOTICE files is insufficient as discoverability 
> within the image is basically 0. We should extract & merge the NOTICE files 
> for all jars, similarly to what we do for {{{}flink-dist{}}}.
>  
> Another small improvement is flink dependencies could be removed from 
> {{{}flink-kubernetes-operator/resources/META-INF/NOTICE{}}}.
>  
> [1]. https://github.com/apache/flink-kubernetes-operator/pull/127



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] mbalassi merged pull request #221: [FLINK-26914] Various improvements for notice and license files

2022-05-18 Thread GitBox


mbalassi merged PR #221:
URL: https://github.com/apache/flink-kubernetes-operator/pull/221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27645) Update overview / supported features page for 1.0.0

2022-05-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27645:
---
Labels: pull-request-available  (was: )

> Update overview / supported features page for 1.0.0
> ---
>
> Key: FLINK-27645
> URL: https://issues.apache.org/jira/browse/FLINK-27645
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> A lot of new features have been implemented and Flink 1.15 support also 
> brings a lot of valuable additions.
> We should update the overview page with the supported features to reflect the 
> new developments.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #226: [FLINK-27645] Update overview / supported features page for 1.0.0

2022-05-18 Thread GitBox


morhidi commented on PR #226:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/226#issuecomment-1130479111

   cc @wangyang0918 @mbalassi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #226: [FLINK-27645] Update overview / supported features page for 1.0.0

2022-05-18 Thread GitBox


morhidi opened a new pull request, #226:
URL: https://github.com/apache/flink-kubernetes-operator/pull/226

   - Restructured the feature summary
   - Added links for quick access
   - Added Known issues and limitations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27688) Add FlinkOperatorEventListener interface

2022-05-18 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-27688:
---
Summary: Add FlinkOperatorEventListener interface  (was: Pluggable backend 
for EventUtils)

> Add FlinkOperatorEventListener interface
> 
>
> Key: FLINK-27688
> URL: https://issues.apache.org/jira/browse/FLINK-27688
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Currently the 
> [EventUtils|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java]
>  utility that we use to publish events for the operator has an implementation 
> that is tightly coupled with the [Kubernetes 
> Events|https://www.containiq.com/post/kubernetes-events] mechanism.
> I suggest to enhance this with a pluggable event interface, which could be 
> implemented by our users to support their event messaging system of choice. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27688) Pluggable backend for EventUtils

2022-05-18 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539047#comment-17539047
 ] 

Gyula Fora commented on FLINK-27688:


I think we should keep the Kubernetes event mechanism and add an 
FlinkOperatorEventListener interface which could expose callbacks on status 
changes and event triggers.

> Pluggable backend for EventUtils
> 
>
> Key: FLINK-27688
> URL: https://issues.apache.org/jira/browse/FLINK-27688
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.1.0
>
>
> Currently the 
> [EventUtils|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java]
>  utility that we use to publish events for the operator has an implementation 
> that is tightly coupled with the [Kubernetes 
> Events|https://www.containiq.com/post/kubernetes-events] mechanism.
> I suggest to enhance this with a pluggable event interface, which could be 
> implemented by our users to support their event messaging system of choice. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27688) Pluggable backend for EventUtils

2022-05-18 Thread Jira
Márton Balassi created FLINK-27688:
--

 Summary: Pluggable backend for EventUtils
 Key: FLINK-27688
 URL: https://issues.apache.org/jira/browse/FLINK-27688
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Márton Balassi
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.1.0


Currently the 
[EventUtils|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java]
 utility that we use to publish events for the operator has an implementation 
that is tightly coupled with the [Kubernetes 
Events|https://www.containiq.com/post/kubernetes-events] mechanism.

I suggest to enhance this with a pluggable event interface, which could be 
implemented by our users to support their event messaging system of choice. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #225: [FLINK-27675] Improve manual savepoint tracking

2022-05-18 Thread GitBox


morhidi commented on PR #225:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/225#issuecomment-1130465588

   cc @wangyang0918 @SteNicholas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27675) Improve manual savepoint tracking

2022-05-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-27675:
---
Labels: pull-request-available  (was: )

> Improve manual savepoint tracking
> -
>
> Key: FLINK-27675
> URL: https://issues.apache.org/jira/browse/FLINK-27675
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Matyas Orhidi
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.0.0
>
>
> There are 2 problems with the manual savpeoint result observing logic that 
> can cause the reconciler to not make progress with the deployment 
> (recoveries, upgrades etc).
>  # Whenever the jobmanager deployment is not in READY state or the job itself 
> is not RUNNING, the trigger info must be reset and we should not try to query 
> it anymore. Flink will not retry the savepoint if the job fails, restarted 
> anyways.
>  # If there is a sensible error when fetching the savepoint status (such as: 
> There is no savepoint operation with triggerId=xxx for job ) we should simply 
> reset the trigger. These errors will never go away on their own and will 
> simply cause the deployment to get stuck in observing/waiting for a savepoint 
> to complete



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #225: [FLINK-27675] Improve manual savepoint tracking

2022-05-18 Thread GitBox


morhidi opened a new pull request, #225:
URL: https://github.com/apache/flink-kubernetes-operator/pull/225

   Cancelling savepoint operation on application failures
   Cancelling savepoint operation on savepoint fetching failures
   Generating events for failed savepoint operations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on pull request #119: POC for ALTER TABLE ... COMPACT

2022-05-18 Thread GitBox


LadyForest commented on PR #119:
URL: 
https://github.com/apache/flink-table-store/pull/119#issuecomment-1130282820

   When I rebased the origin/master to resolve conflicts, I found the 
`AlterTableCompactITCase` failed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-27687) Flink shouldn't assume temp folders keep existing when unused

2022-05-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-27687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaël Renoux updated FLINK-27687:

Summary: Flink shouldn't assume temp folders keep existing when unused  
(was: SpanningWrapper shouldn't assume temp folder exists)

> Flink shouldn't assume temp folders keep existing when unused
> -
>
> Key: FLINK-27687
> URL: https://issues.apache.org/jira/browse/FLINK-27687
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Network
>Affects Versions: 1.14.4
>Reporter: Gaël Renoux
>Priority: Major
>
> In SpanningWrapper.createSpillingChannel, it assumes that the folder in which 
> we create the file exists. However, this is not the case in the following 
> scenario (which actually happened to us today):
>  * The temp folders were created a while ago (I assume on startup of the 
> task-manager) in the /tmp folder. They weren't used for a while, probably 
> because we didn't have any record big enough to trigger it.
>  * The cleanup cron for /tmp did its job and deleted those old folders in 
> /tmp.
>  * We deployed a new version of the job that actually needed the folders, and 
> it crashed.
> => Not sure if it should be SpanningWrapper's responsability to create the 
> folder if it doesn't exist anymore, though, but I'm not familiar enough with 
> Flink's internal to make a guess as to what class should do it. The problem 
> occurred to us on SpanningWrapper, but it can probably happen in other places 
> as well.
> More generally, assuming that folders and files in /tmp won't get deleted at 
> some point doesn't seem correct to me. The [documentation for 
> io.tmp.dirs|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/]
>  recommands that it shouldn't be purged, but we do need to clean up at some 
> point. If that is not the case, then the documentation should be updated to 
> indicate that this is not a recommendation but mandatory, and that purges 
> will break the jobs (not just trigger a recovery).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27687) SpanningWrapper shouldn't assume temp folder exists

2022-05-18 Thread Jira
Gaël Renoux created FLINK-27687:
---

 Summary: SpanningWrapper shouldn't assume temp folder exists
 Key: FLINK-27687
 URL: https://issues.apache.org/jira/browse/FLINK-27687
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Network
Affects Versions: 1.14.4
Reporter: Gaël Renoux


In SpanningWrapper.createSpillingChannel, it assumes that the folder in which 
we create the file exists. However, this is not the case in the following 
scenario (which actually happened to us today):
 * The temp folders were created a while ago (I assume on startup of the 
task-manager) in the /tmp folder. They weren't used for a while, probably 
because we didn't have any record big enough to trigger it.
 * The cleanup cron for /tmp did its job and deleted those old folders in /tmp.
 * We deployed a new version of the job that actually needed the folders, and 
it crashed.

=> Not sure if it should be SpanningWrapper's responsability to create the 
folder if it doesn't exist anymore, though, but I'm not familiar enough with 
Flink's internal to make a guess as to what class should do it. The problem 
occurred to us on SpanningWrapper, but it can probably happen in other places 
as well.

More generally, assuming that folders and files in /tmp won't get deleted at 
some point doesn't seem correct to me. The [documentation for 
io.tmp.dirs|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/]
 recommands that it shouldn't be purged, but we do need to clean up at some 
point. If that is not the case, then the documentation should be updated to 
indicate that this is not a recommendation but mandatory, and that purges will 
break the jobs (not just trigger a recovery).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27162) RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState failed on azure

2022-05-18 Thread Anton Kalashnikov (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Kalashnikov reassigned FLINK-27162:
-

Assignee: Anton Kalashnikov

> RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState failed on 
> azure
> ---
>
> Key: FLINK-27162
> URL: https://issues.apache.org/jira/browse/FLINK-27162
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Yun Gao
>Assignee: Anton Kalashnikov
>Priority: Major
>  Labels: test-stability
>
> {code:java}
> 2022-04-08T16:21:37.7382295Z Apr 08 16:21:37 [ERROR] Tests run: 2, Failures: 
> 0, Errors: 1, Skipped: 0, Time elapsed: 19.21 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase
> 2022-04-08T16:21:37.7383825Z Apr 08 16:21:37 [ERROR] 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState
>   Time elapsed: 9.642 s  <<< ERROR!
> 2022-04-08T16:21:37.7385362Z Apr 08 16:21:37 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint was 
> canceled because a barrier from newer checkpoint was received.
> 2022-04-08T16:21:37.7386479Z Apr 08 16:21:37  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 2022-04-08T16:21:37.7387206Z Apr 08 16:21:37  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2022-04-08T16:21:37.7388026Z Apr 08 16:21:37  at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.runJobAndGetCheckpoint(RescaleCheckpointManuallyITCase.java:196)
> 2022-04-08T16:21:37.7389054Z Apr 08 16:21:37  at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingKeyedState(RescaleCheckpointManuallyITCase.java:137)
> 2022-04-08T16:21:37.7390072Z Apr 08 16:21:37  at 
> org.apache.flink.test.checkpointing.RescaleCheckpointManuallyITCase.testCheckpointRescalingInKeyedState(RescaleCheckpointManuallyITCase.java:115)
> 2022-04-08T16:21:37.7391320Z Apr 08 16:21:37  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-04-08T16:21:37.7392401Z Apr 08 16:21:37  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-04-08T16:21:37.7393916Z Apr 08 16:21:37  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-04-08T16:21:37.7394662Z Apr 08 16:21:37  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-04-08T16:21:37.7395293Z Apr 08 16:21:37  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-04-08T16:21:37.7396038Z Apr 08 16:21:37  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-04-08T16:21:37.7396749Z Apr 08 16:21:37  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-04-08T16:21:37.7397458Z Apr 08 16:21:37  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-04-08T16:21:37.7398164Z Apr 08 16:21:37  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-04-08T16:21:37.7398844Z Apr 08 16:21:37  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2022-04-08T16:21:37.7399505Z Apr 08 16:21:37  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-04-08T16:21:37.7400182Z Apr 08 16:21:37  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-04-08T16:21:37.7400804Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-04-08T16:21:37.7401492Z Apr 08 16:21:37  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-04-08T16:21:37.7402605Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-04-08T16:21:37.7403783Z Apr 08 16:21:37  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-04-08T16:21:37.7404514Z Apr 08 16:21:37  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-04-08T16:21:37.7405180Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-04-08T16:21:37.7405784Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-04-08T16:21:37.7406537Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-04-08T16:21:37.7407256Z Apr 08 16:21:37  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 

[jira] [Commented] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-18 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538931#comment-17538931
 ] 

zlzhang0122 commented on FLINK-27608:
-

[~Thesharing] First, thanks for your quickly response and really detailed 
explanation. And yes, I agree with you, there is only one scenario here because 
it is a distributed environment. The reason why it takes such a long time to 
deploy the upstream tasks is the upstream tasks has a large state to restore. 
And sometimes this may be happen very frequently. So the problem comes back to 
the beginning that  the config of taskmanager.network.request-backoff.max is 
not easy to decide and can we have some better solution to deal with it? Thanks 
again!!

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] fredia commented on a diff in pull request #19448: [FLINK-25872][state] Support restore from non-changelog checkpoint with changelog enabled in CLAIM mode

2022-05-18 Thread GitBox


fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r876082210


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java:
##
@@ -227,6 +229,14 @@ private static void registerSharedState(
 for (KeyedStateHandle stateHandle : stateHandles) {
 if (stateHandle != null) {
 stateHandle.registerSharedStates(sharedStateRegistry, 
checkpointID);
+// Registering state handle to the given sharedStateRegistry 
serves two purposes:
+// 1. let sharedStateRegistry be responsible for cleaning the 
state handle,

Review Comment:
   You are right, I added the ID to the wrapper class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xinbinhuang commented on pull request #17873: [FLINK-25009][CLI] Output slotSharingGroup as part of JsonGraph

2022-05-18 Thread GitBox


xinbinhuang commented on PR #17873:
URL: https://github.com/apache/flink/pull/17873#issuecomment-1130206793

   @KarmaGYZ @SteNicholas CI green. ✅  PTAL if you have time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27686) Only patch status when the status actually changed

2022-05-18 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538918#comment-17538918
 ] 

Gyula Fora commented on FLINK-27686:


cc [~wangyang0918] 

> Only patch status when the status actually changed
> --
>
> Key: FLINK-27686
> URL: https://issues.apache.org/jira/browse/FLINK-27686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Critical
> Fix For: kubernetes-operator-1.0.0
>
>
> The StatusHelper class currently always patches the status regardless if it 
> changed or not.
> We should use an ObjectMapper and simply compare the ObjectNode 
> representations and only patch if there is any change.
>  
> (I think we cannot directly compare the status objects because some of the 
> content comes from getters and are not part of the equals implementation)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27686) Only patch status when the status actually changed

2022-05-18 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-27686:
--

 Summary: Only patch status when the status actually changed
 Key: FLINK-27686
 URL: https://issues.apache.org/jira/browse/FLINK-27686
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.0.0


The StatusHelper class currently always patches the status regardless if it 
changed or not.

We should use an ObjectMapper and simply compare the ObjectNode representations 
and only patch if there is any change.

 

(I think we cannot directly compare the status objects because some of the 
content comes from getters and are not part of the equals implementation)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] twalthr commented on pull request #19734: [FLINK-27492][table] Introduce flink-table-api-scala-uber for convenience

2022-05-18 Thread GitBox


twalthr commented on PR #19734:
URL: https://github.com/apache/flink/pull/19734#issuecomment-1130128858

   `flink-table-api-scala` does not rely on `flink-scala`. In theory, we don't 
need to pull the entire DataStream API for Table API users. But since we bundle 
`flink-table-api-scala-bridge` in there as well. Yes, we can offer an entire 
`flink-dist-scala`. Would we then also include gelly and cep? Both are 
currently also available in the `/opt` folder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27497) Track terminal job states in the observer

2022-05-18 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538892#comment-17538892
 ] 

Gyula Fora commented on FLINK-27497:


Sorry for the late response. The simple reason is that Flink itself does not 
allow you to enable  {{execution.submit-failed-job-on-application-error if HA 
is disabled.}}

> Track terminal job states in the observer
> -
>
> Key: FLINK-27497
> URL: https://issues.apache.org/jira/browse/FLINK-27497
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.0.0
>Reporter: Gyula Fora
>Assignee: Aitozi
>Priority: Critical
>
> With the improvements in FLINK-27468 Flink 1.15 app clusters will not be shut 
> down in case of terminal job states (failed, finished) etc.
> It is important to properly handle these states and let the user know about 
> it.
> We should always trigger events, and for terminally failed jobs record the 
> error information in the FlinkDeployment status.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26960) Make it possible to drop an old unused registered Kryo serializer

2022-05-18 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538877#comment-17538877
 ] 

Gaël Renoux commented on FLINK-26960:
-

Also: I tried doing it in multiple states, first removing the registering of 
the serializer (no call to registerTypeWithKryoSerializer) but keeping the 
class, then removing the class: first deployment succeeded, second one failed 
with the same issue. It seems like a serializer, once added, will stay forever 
in the state (and require its target class to still exist).

> Make it possible to drop an old unused registered Kryo serializer
> -
>
> Key: FLINK-26960
> URL: https://issues.apache.org/jira/browse/FLINK-26960
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.0, 1.12.7, 1.13.6, 1.14.4
>Reporter: Dawid Wysakowicz
>Priority: Critical
> Attachments: stack-trace.log
>
>
> If users register a Kryo serializer e.g. via:
> {code}
> env.registerTypeWithKryoSerializer(ClassA. ClassASerializer.class);
> {code}
> and then use a Kryo serializer for serializing state objects, the registered 
> serializer is written into the KryoSerializer snapshot. Even if Kryo is used 
> for serializing classes other than ClassA. This makes it impossible to remove 
> {{ClassASerializer}} from the classpath, because it is required for reading 
> the savepoint.
> This problem has been reported by users before e.g.
> https://lists.apache.org/thread/989mfrxqznvzpmhm0315kv23bxh1ln8y



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-26960) Make it possible to drop an old unused registered Kryo serializer

2022-05-18 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538874#comment-17538874
 ] 

Gaël Renoux edited comment on FLINK-26960 at 5/18/22 2:30 PM:
--

We hit this issue today. I had a serializer for a custom class in the past. 
Following some refactoring, the class was no longer needed, so we removed it 
(and its serializer) from the job. When we deployed the new version of the job 
from a savepoint produced by the old version (because there was a lot of state 
we wanted to keep), we got the error (joined the stack-trace).

[^stack-trace.log]


was (Author: gael):
We hit thaht issue today. I had a serializer for a custom class in the past. 
Following some refactoring, the class was no longer needed, so we removed it 
(and its serializer) from the job. When we deployed the new version of the job 
from a savepoint produced by the old version (because there was a lot of state 
we wanted to keep), we got the error (joined the stack-trace).

[^stack-trace.log]

> Make it possible to drop an old unused registered Kryo serializer
> -
>
> Key: FLINK-26960
> URL: https://issues.apache.org/jira/browse/FLINK-26960
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.0, 1.12.7, 1.13.6, 1.14.4
>Reporter: Dawid Wysakowicz
>Priority: Critical
> Attachments: stack-trace.log
>
>
> If users register a Kryo serializer e.g. via:
> {code}
> env.registerTypeWithKryoSerializer(ClassA. ClassASerializer.class);
> {code}
> and then use a Kryo serializer for serializing state objects, the registered 
> serializer is written into the KryoSerializer snapshot. Even if Kryo is used 
> for serializing classes other than ClassA. This makes it impossible to remove 
> {{ClassASerializer}} from the classpath, because it is required for reading 
> the savepoint.
> This problem has been reported by users before e.g.
> https://lists.apache.org/thread/989mfrxqznvzpmhm0315kv23bxh1ln8y



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26960) Make it possible to drop an old unused registered Kryo serializer

2022-05-18 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-26960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538874#comment-17538874
 ] 

Gaël Renoux commented on FLINK-26960:
-

We hit thaht issue today. I had a serializer for a custom class in the past. 
Following some refactoring, the class was no longer needed, so we removed it 
(and its serializer) from the job. When we deployed the new version of the job 
from a savepoint produced by the old version (because there was a lot of state 
we wanted to keep), we got the error (joined the stack-trace).

[^stack-trace.log]

> Make it possible to drop an old unused registered Kryo serializer
> -
>
> Key: FLINK-26960
> URL: https://issues.apache.org/jira/browse/FLINK-26960
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.0, 1.12.7, 1.13.6, 1.14.4
>Reporter: Dawid Wysakowicz
>Priority: Critical
> Attachments: stack-trace.log
>
>
> If users register a Kryo serializer e.g. via:
> {code}
> env.registerTypeWithKryoSerializer(ClassA. ClassASerializer.class);
> {code}
> and then use a Kryo serializer for serializing state objects, the registered 
> serializer is written into the KryoSerializer snapshot. Even if Kryo is used 
> for serializing classes other than ClassA. This makes it impossible to remove 
> {{ClassASerializer}} from the classpath, because it is required for reading 
> the savepoint.
> This problem has been reported by users before e.g.
> https://lists.apache.org/thread/989mfrxqznvzpmhm0315kv23bxh1ln8y



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-26960) Make it possible to drop an old unused registered Kryo serializer

2022-05-18 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-26960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaël Renoux updated FLINK-26960:

Attachment: stack-trace.log

> Make it possible to drop an old unused registered Kryo serializer
> -
>
> Key: FLINK-26960
> URL: https://issues.apache.org/jira/browse/FLINK-26960
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.15.0, 1.12.7, 1.13.6, 1.14.4
>Reporter: Dawid Wysakowicz
>Priority: Critical
> Attachments: stack-trace.log
>
>
> If users register a Kryo serializer e.g. via:
> {code}
> env.registerTypeWithKryoSerializer(ClassA. ClassASerializer.class);
> {code}
> and then use a Kryo serializer for serializing state objects, the registered 
> serializer is written into the KryoSerializer snapshot. Even if Kryo is used 
> for serializing classes other than ClassA. This makes it impossible to remove 
> {{ClassASerializer}} from the classpath, because it is required for reading 
> the savepoint.
> This problem has been reported by users before e.g.
> https://lists.apache.org/thread/989mfrxqznvzpmhm0315kv23bxh1ln8y



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [flink] MartijnVisser merged pull request #19760: [FLINK-27185][connectors] Convert connector-base module to assertj

2022-05-18 Thread GitBox


MartijnVisser merged PR #19760:
URL: https://github.com/apache/flink/pull/19760


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser merged pull request #19763: [FLINK-27185][connectors] Convert connector-hive module to assertj

2022-05-18 Thread GitBox


MartijnVisser merged PR #19763:
URL: https://github.com/apache/flink/pull/19763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser merged pull request #19762: [FLINK-27185][connectors] Convert connector-elasticsearch module to assertj

2022-05-18 Thread GitBox


MartijnVisser merged PR #19762:
URL: https://github.com/apache/flink/pull/19762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-27661) [Metric]Flink-Metrics PrometheusPushGatewayReporter support authentication

2022-05-18 Thread dennis.cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538868#comment-17538868
 ] 

dennis.cao commented on FLINK-27661:


Well done

> [Metric]Flink-Metrics PrometheusPushGatewayReporter support authentication
> --
>
> Key: FLINK-27661
> URL: https://issues.apache.org/jira/browse/FLINK-27661
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
> Environment: Flink:1.13.0
>Reporter: jiangchunyang
>Priority: Major
>  Labels: pull-request-available
>
> We found that the native PushGateway does not support authentication. As a 
> result, the metrics data in on YARN mode cannot be reported to pushGateway 
> with authentication.  
> Although we have some other solutions, such as landing files and others, we 
> think pushGateway is the best solution.  
> So I decided to do some implementation on my own, and will submit pr to the 
> community later.
> At present I only submit pr to the branch of Flink-1.13. If necessary, I 
> think I can submit it to the master branch.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-27615) Document how to define namespaceSelector for k8s operator's webhook for different k8s versions

2022-05-18 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang reassigned FLINK-27615:
-

Assignee: Biao Geng

> Document how to define namespaceSelector for k8s operator's webhook for 
> different k8s versions
> --
>
> Key: FLINK-27615
> URL: https://issues.apache.org/jira/browse/FLINK-27615
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Biao Geng
>Priority: Major
>
> In our webhook, to support {{{}watchNamespaces{}}}, we rely on the 
> {{kubernetes.io/metadata.name}} to filter the validation requests. However, 
> this label will be automatically added to a namespace only since k8s 1.21.1 
> due to k8s' [release 
> notes|https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG/CHANGELOG-1.21.md#v1211]
>  .  If users run the flink k8s operator on older k8s versions, they have to 
> add such label by themselevs to support the feature of namespaceSelector in 
> our webhook.
> As a result, if we want to support the feature defaultly, we may need to 
> emphasize that users should use k8s >= v1.21.1 to run the flink k8s operator.
>  
> Due to aitozi's advice, it may be better to add document of how to support 
> the nameSelector based validation instead of limiting the k8s version to be 
> >= 1.21.1 to support more users.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (FLINK-25950) Delete retry mechanism from ZooKeeperUtils.deleteZNode

2022-05-18 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-25950:
-

Assignee: jackwangcs

> Delete retry mechanism from ZooKeeperUtils.deleteZNode
> --
>
> Key: FLINK-25950
> URL: https://issues.apache.org/jira/browse/FLINK-25950
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Runtime / Coordination
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Assignee: jackwangcs
>Priority: Major
>  Labels: pull-request-available
>
> {{ZooKeeperUtils.deleteZNode}} implements a retry loop that is not necessary 
> for curator version 4.0.1+. This code can be cleaned up



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


  1   2   3   >