Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


ramkrish86 commented on code in PR #24896:
URL: https://github.com/apache/flink/pull/24896#discussion_r1634237585


##
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java:
##
@@ -59,7 +59,7 @@ public void write(byte[] b, int off, int len) throws 
IOException {
 
 @Override
 public void flush() throws IOException {
-out.hsync();
+out.hflush();

Review Comment:
   I checked this change. In ABFS case hflush and hsync internally does the 
same I believe. So that functionality might not break. In HDFS case yes this 
might make things costly. Thanks for fixing this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


cxzl25 commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159856148

   > The commit message is not yet update
   
   Sorry, it's updated now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-10 Thread via GitHub


superdiaodiao commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2159830832

   > hi @snuyanzin @superdiaodiao do we need supports encoding args ?
   > db2 
https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode
   > max compute 
https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode
   > 
   
   Calcite, Spark also need only one arg, it is enough to handle cases and 
UTF-8 can support all.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-26951) Add HASH supported in SQL & Table API

2024-06-10 Thread Kartikey Pant (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853901#comment-17853901
 ] 

Kartikey Pant commented on FLINK-26951:
---

Hey [~lincoln.86xy], can you please assign this issue to me?

> Add HASH supported in SQL & Table API
> -
>
> Key: FLINK-26951
> URL: https://issues.apache.org/jira/browse/FLINK-26951
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> Returns a hash value of the arguments.
> Syntax:
> {code:java}
> hash(expr1, ...) {code}
> Arguments:
>  * {{{}exprN{}}}: An expression of any type.
> Returns:
> An INTEGER.
> Examples:
> {code:java}
> > SELECT hash('Flink', array(123), 2);
>  -1321691492 {code}
> See more:
>  * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#hash]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-10 Thread Kartikey Pant (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853900#comment-17853900
 ] 

Kartikey Pant commented on FLINK-12450:
---

Hey [~taoran]/[~jark], could you please confirm if this issue is still open for 
contribution? If so, can you please assign this ticket to me?

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Ran Tao
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


xintongsong commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159816793

   > Thanks, I've updated.
   
   The commit message is not yet update. Have you forgot to push the changes?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-10 Thread via GitHub


liuyongvs commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2159808362

   hi @snuyanzin @superdiaodiao do we need supports encoding args ?
   db2 
https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode
   max compute 
https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2159778519

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634155084


##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   I simplified it a bit to catch GeneralSecurityException, but I think that's 
the most we can do. The Scala base class doesn't throw any exceptions, so we 
have to catch it in the Java override.



##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   updated documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634154163


##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+/** Tests that REST Client SSL Engine creation fails with bad SSL 
configuration. */
+@ParameterizedTest
+@MethodSource("parameters")
+void testRESTClientSSLBadTruststoreType(String sslProvider) {
+Configuration clientConfig = 
createRestSslConfigWithTrustStore(sslProvider);
+clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS");

Review Comment:
   Changed the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634153986


##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+@ParameterizedTest
+@MethodSource("parameters")
+void testInternalSSLWrongKeystoreType(String sslProvider) {
+final Configuration config = 
createInternalSslConfigWithKeyAndTrustStores(sslProvider);
+config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS");
+
+assertThatThrownBy(() -> 
SSLUtils.createInternalServerSSLEngineFactory(config))
+.isInstanceOf(java.io.IOException.class);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-06-10 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853894#comment-17853894
 ] 

Weijie Guo commented on FLINK-18476:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60177=logs=298e20ef-7951-5965-0e79-ea664ddc435e=d4c90338-c843-57b0-3232-10ae74f00347=23446

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34914) FLIP-436: Introduce Catalog-related Syntax

2024-06-10 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853893#comment-17853893
 ] 

Weijie Guo commented on FLINK-34914:


Hi [~liyubin117] and [~qingyue].

FYI: We expect the feature freeze time to be June 15. So there is a great risk 
of this feature to merge into 1.20. I noticed from the 1.20 release wiki page 
that only half of the work is done, would you be willing to move it to the next 
release If you don't have enough time?

> FLIP-436: Introduce Catalog-related Syntax
> --
>
> Key: FLINK-34914
> URL: https://issues.apache.org/jira/browse/FLINK-34914
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>
> Umbrella issue for: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]

2024-06-10 Thread via GitHub


yuxiqian commented on code in PR #3398:
URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1634142791


##
flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh:
##
@@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then
   exit 1
 fi
 
-# Setup Flink related configurations
-# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
-_FLINK_HOME_DETERMINED=1

Review Comment:
   Should `_FLINK_HOME_DETERMINED` env var be preserved here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35563) 'Run kubernetes application test' failed on AZP

2024-06-10 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35563:
---
Description: 
{code:java}
Jun 08 01:52:28 deployment.apps "flink-native-k8s-application-1" deleted
Jun 08 01:52:29 clusterrolebinding.rbac.authorization.k8s.io 
"flink-role-binding-default" deleted
Jun 08 01:52:35 pod/flink-native-k8s-application-1-7c58596d75-blw5k condition 
met
Jun 08 01:52:35 Stopping minikube ...
Jun 08 01:52:35 * Stopping node "minikube"  ...
Jun 08 01:52:40 * 1 node stopped.
Jun 08 01:52:40 [FAIL] Test script contains errors.
Jun 08 01:52:40 Checking for errors...
Jun 08 01:52:40 No errors in log files.
Jun 08 01:52:40 Checking for exceptions...
Jun 08 01:52:40 No exceptions in log files.
Jun 08 01:52:40 Checking for non-empty .out files...
grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or 
directory
Jun 08 01:52:40 No non-empty .out files.
Jun 08 01:52:40 
Jun 08 01:52:40 [FAIL] 'Run kubernetes application test' failed after 3 minutes 
and 2 seconds! Test exited with exit code 1
{code}

Unfortunately, I haven't found the root cause yet (:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60162=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=5d91035e-8022-55f2-2d4f-ab121508bf7e=10046

> 'Run kubernetes application test' failed on AZP
> ---
>
> Key: FLINK-35563
> URL: https://issues.apache.org/jira/browse/FLINK-35563
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> {code:java}
> Jun 08 01:52:28 deployment.apps "flink-native-k8s-application-1" deleted
> Jun 08 01:52:29 clusterrolebinding.rbac.authorization.k8s.io 
> "flink-role-binding-default" deleted
> Jun 08 01:52:35 pod/flink-native-k8s-application-1-7c58596d75-blw5k condition 
> met
> Jun 08 01:52:35 Stopping minikube ...
> Jun 08 01:52:35 * Stopping node "minikube"  ...
> Jun 08 01:52:40 * 1 node stopped.
> Jun 08 01:52:40 [FAIL] Test script contains errors.
> Jun 08 01:52:40 Checking for errors...
> Jun 08 01:52:40 No errors in log files.
> Jun 08 01:52:40 Checking for exceptions...
> Jun 08 01:52:40 No exceptions in log files.
> Jun 08 01:52:40 Checking for non-empty .out files...
> grep: /home/vsts/work/_temp/debug_files/flink-logs/*.out: No such file or 
> directory
> Jun 08 01:52:40 No non-empty .out files.
> Jun 08 01:52:40 
> Jun 08 01:52:40 [FAIL] 'Run kubernetes application test' failed after 3 
> minutes and 2 seconds! Test exited with exit code 1
> {code}
> Unfortunately, I haven't found the root cause yet (:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60162=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=5d91035e-8022-55f2-2d4f-ab121508bf7e=10046



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35563) 'Run kubernetes application test' failed on AZP

2024-06-10 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35563:
--

 Summary: 'Run kubernetes application test' failed on AZP
 Key: FLINK-35563
 URL: https://issues.apache.org/jira/browse/FLINK-35563
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-06-10 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853888#comment-17853888
 ] 

Weijie Guo commented on FLINK-18476:


test_cron_jdk21 misc

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=59a2b95a-736b-5c46-b3e0-cee6e587fd86=c301da75-e699-5c06-735f-778207c16f50=23053

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds

2024-06-10 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35562:
--

 Summary: WindowTableFunctionProcTimeRestoreTest produced no output 
for 900 seconds
 Key: FLINK-35562
 URL: https://issues.apache.org/jira/browse/FLINK-35562
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35562) WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds

2024-06-10 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35562:
---
Description: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=32715a4c-21b8-59a3-4171-744e5ab107eb=ff64056b-5320-5afe-c22c-6fa339e59586=11885

> WindowTableFunctionProcTimeRestoreTest produced no output for 900 seconds
> -
>
> Key: FLINK-35562
> URL: https://issues.apache.org/jira/browse/FLINK-35562
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Major
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60172=logs=32715a4c-21b8-59a3-4171-744e5ab107eb=ff64056b-5320-5afe-c22c-6fa339e59586=11885



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]

2024-06-10 Thread via GitHub


ljz2051 commented on code in PR #24873:
URL: https://github.com/apache/flink/pull/24873#discussion_r1634123958


##
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.execution.RestoreMode;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * FileMerging Compatibility IT case which tests recovery from a checkpoint 
created in different
+ * fileMerging mode (i.e. fileMerging enabled/disabled).
+ */
+public class SnapshotFileMergingCompatibilityITCase extends TestLogger {
+
+public static Collection parameters() {
+return Arrays.asList(
+new Object[][] {
+{RestoreMode.CLAIM, true},
+{RestoreMode.CLAIM, false},
+{RestoreMode.NO_CLAIM, true},
+{RestoreMode.NO_CLAIM, false}
+});
+}
+
+@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+@MethodSource("parameters")
+public void testSwitchFromDisablingToEnablingFileMerging(
+RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+throws Exception {
+testSwitchingFileMerging(
+checkpointDir, false, true, restoreMode, 
fileMergingAcrossBoundary);
+}
+
+@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = 
{1}")
+@MethodSource("parameters")
+public void testSwitchFromEnablingToDisablingFileMerging(
+RestoreMode restoreMode, boolean fileMergingAcrossBoundary, 
@TempDir Path checkpointDir)
+throws Exception {
+testSwitchingFileMerging(
+checkpointDir, true, false, restoreMode, 
fileMergingAcrossBoundary);
+}
+
+private void testSwitchingFileMerging(
+Path checkpointDir,
+boolean firstFileMergingSwitch,
+boolean secondFileMergingSwitch,
+RestoreMode restoreMode,
+boolean fileMergingAcrossBoundary)
+throws Exception {
+final Configuration config = new Configuration();
+config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toUri().toString());
+config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
+config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, 
fileMergingAcrossBoundary);
+config.set(CheckpointingOptions.FILE_MERGING_ENABLED, 
firstFileMergingSwitch);
+MiniClusterWithClientResource firstCluster =
+new MiniClusterWithClientResource(
+new MiniClusterResourceConfiguration.Builder()
+.setConfiguration(config)
+.setNumberTaskManagers(2)
+.setNumberSlotsPerTaskManager(2)
+.build());
+EmbeddedRocksDBStateBackend stateBackend1 = new 
EmbeddedRocksDBStateBackend();
+stateBackend1.configure(config, 
Thread.currentThread().getContextClassLoader());
+firstCluster.before();
+String externalCheckpoint;
+try {
+externalCheckpoint =
+runJobAndGetExternalizedCheckpoint(
+stateBackend1, null, firstCluster, 

Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   @gyfora @1996fanrui 
   I think the existing matching logic is not too complicated, and the PR is 
not too large. If the maximum parallelism of other source components is 
expanded in the future, it will not be too late to split it;
   
   If we want to split the matching logic of Kafka and Pulsar, we may need to 
perform two regular matching on the source metrics data set to obtain the 
number of partitions of these two queues separately, which does not seem to be 
necessary;
   
   It seems that there is no simple or matching. The current regular expression 
is or, and it is also distinguished in the map function. Maybe you have a 
better answer to tell me, thank you.
   
   I suggest keeping the logic of the existing PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]

2024-06-10 Thread via GitHub


cxzl25 commented on PR #24896:
URL: https://github.com/apache/flink/pull/24896#issuecomment-2159691770

   Gentle ping @xintongsong 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634091321


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   Sorry, I will update this PR today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-10 Thread via GitHub


gong commented on PR #3407:
URL: https://github.com/apache/flink-cdc/pull/3407#issuecomment-2159661569

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-10 Thread via GitHub


gong commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2159656572

   > @gong would you like to open a PR for release-3.1 branch?
   
   ok, I will open the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-10 Thread via GitHub


reswqa commented on code in PR #24845:
URL: https://github.com/apache/flink/pull/24845#discussion_r1634076649


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link java.util.Set}. The serializer relies on an element 
serializer for the
+ * serialization of the set's elements.
+ *
+ * The serialization format for the set is as follows: four bytes for the 
length of the set,
+ * followed by the serialized representation of each element.
+ *
+ * @param  The type of element in the set.
+ */
+@Internal
+public final class SetSerializer extends TypeSerializer> {
+
+private static final long serialVersionUID = 1L;
+
+/** The serializer for the elements of the set. */
+private final TypeSerializer elementSerializer;
+
+/**
+ * Creates a set serializer that uses the given serializer to serialize 
the set's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the set
+ */
+public SetSerializer(TypeSerializer elementSerializer) {
+this.elementSerializer = checkNotNull(elementSerializer);
+}
+
+// 
+//  SetSerializer specific properties
+// 
+
+/**
+ * Gets the serializer for the elements of the set.
+ *
+ * @return The serializer for the elements of the set
+ */
+public TypeSerializer getElementSerializer() {
+return elementSerializer;
+}
+
+// 
+//  Type Serializer implementation
+// 
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public TypeSerializer> duplicate() {
+TypeSerializer duplicateElement = elementSerializer.duplicate();
+return duplicateElement == elementSerializer ? this : new 
SetSerializer<>(duplicateElement);
+}
+
+@Override
+public Set createInstance() {
+return new HashSet<>(0);
+}
+
+@Override
+public Set copy(Set from) {
+Set newSet = new HashSet<>(from.size());
+for (T element : from) {
+newSet.add(elementSerializer.copy(element));
+}
+return newSet;
+}
+
+@Override
+public Set copy(Set from, Set reuse) {
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1; // var length
+}
+
+@Override
+public void serialize(Set set, DataOutputView target) throws 
IOException {
+final int size = set.size();
+target.writeInt(size);
+for (T element : set) {
+elementSerializer.serialize(element, target);

Review Comment:
   > I also found that the serializing a list with null values will throw NPE 
as we reused the existing ListSerializer impl, which does not allow null values 
(it is only used for serializing ListState before where null value is 
explicitly forbidden in the contract). Directly adding null marker for it will 
break state compatibility, I plan to introduce a new list serializer that 
accepts null values for serializing user objects in a new JIRA, WDYT
   
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, 

Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-10 Thread via GitHub


reswqa commented on code in PR #24845:
URL: https://github.com/apache/flink/pull/24845#discussion_r1634061063


##
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link java.util.Set}. The serializer relies on an element 
serializer for the
+ * serialization of the set's elements.
+ *
+ * The serialization format for the set is as follows: four bytes for the 
length of the set,
+ * followed by the serialized representation of each element.
+ *
+ * @param  The type of element in the set.
+ */
+@Internal
+public final class SetSerializer extends TypeSerializer> {
+
+private static final long serialVersionUID = 1L;
+
+/** The serializer for the elements of the set. */
+private final TypeSerializer elementSerializer;
+
+/**
+ * Creates a set serializer that uses the given serializer to serialize 
the set's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the set
+ */
+public SetSerializer(TypeSerializer elementSerializer) {
+this.elementSerializer = checkNotNull(elementSerializer);
+}
+
+// 
+//  SetSerializer specific properties
+// 
+
+/**
+ * Gets the serializer for the elements of the set.
+ *
+ * @return The serializer for the elements of the set
+ */
+public TypeSerializer getElementSerializer() {
+return elementSerializer;
+}
+
+// 
+//  Type Serializer implementation
+// 
+
+@Override
+public boolean isImmutableType() {
+return false;
+}
+
+@Override
+public TypeSerializer> duplicate() {
+TypeSerializer duplicateElement = elementSerializer.duplicate();
+return duplicateElement == elementSerializer ? this : new 
SetSerializer<>(duplicateElement);
+}
+
+@Override
+public Set createInstance() {
+return new HashSet<>(0);
+}
+
+@Override
+public Set copy(Set from) {
+Set newSet = new HashSet<>(from.size());
+for (T element : from) {
+newSet.add(elementSerializer.copy(element));
+}
+return newSet;
+}
+
+@Override
+public Set copy(Set from, Set reuse) {
+return copy(from);
+}
+
+@Override
+public int getLength() {
+return -1; // var length
+}
+
+@Override
+public void serialize(Set set, DataOutputView target) throws 
IOException {
+final int size = set.size();
+target.writeInt(size);
+for (T element : set) {
+elementSerializer.serialize(element, target);

Review Comment:
   > I also found that the serializing a list with null values will throw NPE 
as we reused the existing ListSerializer impl, which does not allow null values 
(it is only used for serializing ListState before where null value is 
explicitly forbidden in the contract). Directly adding null marker for it will 
break state compatibility, I plan to introduce a new list serializer that 
accepts null values for serializing user objects in a new JIRA, WDYT
   
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, 

Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634051534


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   This PR has not been updated for almost a month, and the author has not 
responded. 
   
   The flink kubernetes operator will be released this week, so I will take 
over this PR if it's still not updated or responded within 24 hours. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853858#comment-17853858
 ] 

Rui Fan commented on FLINK-35489:
-

Merged to main(1.9.0) via: c3e94aec0c6f081a99b7467bd0bcee551b841600

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Assignee: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-35489:
---

Assignee: Nicolas Fraison

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Assignee: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-35489.
-
Fix Version/s: kubernetes-operator-1.9.0
   Resolution: Fixed

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Assignee: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui merged PR #833:
URL: https://github.com/apache/flink-kubernetes-operator/pull/833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


1996fanrui commented on PR #833:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2159610745

   Thanks @ashangit for the fix and everyone for the review!
   
   Merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-10 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853850#comment-17853850
 ] 

Xintong Song edited comment on FLINK-35557 at 6/11/24 1:26 AM:
---

I think this is by design. As suggested by the name, the memory is expected to 
be shared within the slot. Take RocksDBStateBackend as an example, when 
there're multiple states in one slot, there will only be one cache area and one 
write buffer area shared by all the states. Given that states are initialized 
concurrently, there's no guarantee which state will be initialized first, thus 
we introduced shared resources to only allocate the memory when the first state 
is initialized.

If the memory consumer does not want this sharing behavior, it can simply call 
allocatePages / reserveMemory instead of 
getSharedMemoryResourceForManagedMemory.


was (Author: xintongsong):
I think this is by design. As suggested by the name, the memory is expected to 
be shared within the slot. Take RocksDBStateBackend as an example, when 
there're multiple states in one slot, there will only be one cache area and one 
write buffer area share by all the states. Given that states are initialized 
concurrently, there's no guarantee which state will be initialized first, thus 
we introduced shared resources to only allocate the memory when the first state 
is initialized.

If the memory consumer does not want this sharing behaviors, it can simply call 
allocatePages / reserveMemory instead of 
getSharedMemoryResourceForManagedMemory.

> MemoryManager only reserves memory per consumer type once
> -
>
> Key: FLINK-35557
> URL: https://issues.apache.org/jira/browse/FLINK-35557
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Runtime / Task
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.20.0, 1.19.2
>
>
> # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
> [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
>  a reserve function
>  # The function 
> [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
>  the available Slot memory and fails if there's not enough memory
>  # We pass it to {{SharedResources.getOrAllocateSharedResource}}
>  # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
> resource (memory) was already reserved by some key (e.g. 
> {{{}state-rocks-managed-memory{}}})
>  # If not, we create a new one and call the reserve function
>  # If the resource was already reserved (not null), we do NOT reserve the 
> memory again: 
> [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]
> So there will be only one (first) memory reservation for rocksdb for example, 
> no matter how many state backends in a slot are created. Meaning that managed 
> memory limits are not applied



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-10 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853850#comment-17853850
 ] 

Xintong Song commented on FLINK-35557:
--

I think this is by design. As suggested by the name, the memory is expected to 
be shared within the slot. Take RocksDBStateBackend as an example, when 
there're multiple states in one slot, there will only be one cache area and one 
write buffer area share by all the states. Given that states are initialized 
concurrently, there's no guarantee which state will be initialized first, thus 
we introduced shared resources to only allocate the memory when the first state 
is initialized.

If the memory consumer does not want this sharing behaviors, it can simply call 
allocatePages / reserveMemory instead of 
getSharedMemoryResourceForManagedMemory.

> MemoryManager only reserves memory per consumer type once
> -
>
> Key: FLINK-35557
> URL: https://issues.apache.org/jira/browse/FLINK-35557
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Runtime / Task
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.20.0, 1.19.2
>
>
> # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
> [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
>  a reserve function
>  # The function 
> [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
>  the available Slot memory and fails if there's not enough memory
>  # We pass it to {{SharedResources.getOrAllocateSharedResource}}
>  # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
> resource (memory) was already reserved by some key (e.g. 
> {{{}state-rocks-managed-memory{}}})
>  # If not, we create a new one and call the reserve function
>  # If the resource was already reserved (not null), we do NOT reserve the 
> memory again: 
> [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]
> So there will be only one (first) memory reservation for rocksdb for example, 
> no matter how many state backends in a slot are created. Meaning that managed 
> memory limits are not applied



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634004521


##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   I found an `OverrideDefault` annotation, will try that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


ammar-master commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1633910473


##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   You're right. Since I generated the documentation with Java 11, it output 
PKCS12. Do you know how I can stop the default value in the documentation from 
being generated and instead add it manually?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]

2024-06-10 Thread via GitHub


morazow commented on PR #100:
URL: 
https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2159296755

   Thanks @dongwoo6kim ,
   
   Tests looks good from my side  
   
   (_Recently I faced similar issue which maybe related, when running batch 
mode with setting `startingOffsets`. The change should solve that issue. But we 
may create issue for it_)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-10 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2159216321

   I moved the example application under tests and removed the separate module 
completely.
   I added links to the example application in the README.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-10 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1633781977


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 

Review Comment:
   I moved the example application under tests, as suggested, merging 
everything in a single class.
   The usage of the AMP connector is commented out, being a separate 
dependency, but still present to show the usage, in case someone wants to use 
it.
   
   
https://github.com/apache/flink-connector-prometheus/blob/5afb890aa8fe860ce25bcf89f7cdee93a6d2c4be/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2024-06-10 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853754#comment-17853754
 ] 

Robert Metzger commented on FLINK-30400:


The problem of marking flink-connector-base provided for a connector is that 
classes like the {{DeliveryGuarantee}} are missing when developing locally.
For example the KafkaSink example includes the {{DeliveryGuarantee}}, which is 
not pulled in by my IDE because it a transitive provided dependency.

We could also update the connector docs and list the {{flink-connector-base}} 
as well?

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, 
> rabbitmq-3.1.0, kafka-3.0.2
>
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35559) Shading issue cause class conflict

2024-06-10 Thread Aleksandr Pilipenko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Pilipenko updated FLINK-35559:

Priority: Blocker  (was: Major)

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> 

[jira] [Commented] (FLINK-35445) Update Async Sink documentation for Timeout configuration

2024-06-10 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853734#comment-17853734
 ] 

Ahmed Hamdy commented on FLINK-35445:
-

As discussed offline, there is no docs for Asyncsink, hence will close as not 
an issue

> Update Async Sink documentation for Timeout configuration 
> --
>
> Key: FLINK-35445
> URL: https://issues.apache.org/jira/browse/FLINK-35445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Documentation
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: 1.20.0
>
>
> Update Documentation for AsyncSink Changes introduced by 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35445) Update Async Sink documentation for Timeout configuration

2024-06-10 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy closed FLINK-35445.
---
Resolution: Not A Problem

> Update Async Sink documentation for Timeout configuration 
> --
>
> Key: FLINK-35445
> URL: https://issues.apache.org/jira/browse/FLINK-35445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common, Documentation
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: 1.20.0
>
>
> Update Documentation for AsyncSink Changes introduced by 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19

2024-06-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853733#comment-17853733
 ] 

Sergey Nuyanzin commented on FLINK-35140:
-

Need for PMC help as mentioned at

https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g

> Release flink-connector-opensearch vX.X.X for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19

2024-06-10 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853733#comment-17853733
 ] 

Sergey Nuyanzin edited comment on FLINK-35140 at 6/10/24 4:43 PM:
--

Almost done: voting passed for both v1 and v2
Now need for PMC help as mentioned at

[https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g]


was (Author: sergey nuyanzin):
Need for PMC help as mentioned at

https://lists.apache.org/thread/4bhbw7lvh9r0r8s6lkgmg22ftpg9cm8g

> Release flink-connector-opensearch vX.X.X for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35140) Release flink-connector-opensearch vX.X.X for Flink 1.19

2024-06-10 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin reassigned FLINK-35140:
---

Assignee: Sergey Nuyanzin  (was: Sergey Nuyanzin)

> Release flink-connector-opensearch vX.X.X for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


z3d1k commented on PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158839029

   > There is a risk this will break user code depending on these libs, right? 
Given this is a bug, and they should not be using the shaded libs, I think this 
is ok, thoughts?
   
   Classes that we add to relocation are specific for connector implementation.
   Users not only should not generally use shaded libs, they also should not 
use internal connector classes that are being affected by this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35561) Flink REST API incorrect documentation

2024-06-10 Thread Shyam (Jira)
Shyam created FLINK-35561:
-

 Summary: Flink REST API incorrect documentation
 Key: FLINK-35561
 URL: https://issues.apache.org/jira/browse/FLINK-35561
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Runtime / REST
Reporter: Shyam


Flink REST API documentation for JAR upload 
([/jars/upload|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-upload])
 states that the response will contain fileName, which will then be used later 
to run Flink Jobs.

In the 
[/jar/:jarid/run]([https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run)]
 documentation, the definition for :jarid is "String value that identifies a 
jar. When uploading the jar a path is returned, where the filename is the ID. 
This value is equivalent to the `id` field in the list of uploaded jars 
(/jars)."

 

This statement identifying file name should be changed to:

String value that identifies a jar. When uploading the jar, a path is returned, 
where the {*}filename contains the ID{*}, and it is the text after the last 
forward slash. This value is equivalent to the `id` field in the list of 
uploaded jars (/jars).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2158683163

   @hlteoh37 / @z3d1k can you please take a look at this one too?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1633458080


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+/**
+ * Default implementation of {@link ElementConverter} that lazily falls back 
to {@link
+ * DynamoDbTypeInformedElementConverter}.
+ */
+@PublicEvolving
+public class DefaultDynamoDbElementConverter
+implements ElementConverter {
+
+private ElementConverter elementConverter;
+
+public DefaultDynamoDbElementConverter() {}
+
+@Override
+public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) {
+if (elementConverter == null) {
+TypeInformation typeInfo = (TypeInformation) 
TypeInformation.of(t.getClass());
+if (!(typeInfo instanceof CompositeType)) {
+throw new IllegalArgumentException("The input type must be a 
CompositeType.");
+}
+
+elementConverter =
+new 
DynamoDbTypeInformedElementConverter<>((CompositeType) typeInfo);
+}
+
+return elementConverter.apply(t, context);
+}
+
+@Override
+public void open(Sink.InitContext context) {}

Review Comment:
   nit: Do we need this? I think it has a no-op default implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink

2024-06-10 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853713#comment-17853713
 ] 

Danny Cranmer commented on FLINK-35435:
---

Merged commit 
[{{c9def98}}|https://github.com/apache/flink/commit/c9def98128f0b948d98a11d57da9ea69d326c227]
 into master

> [FLIP-451] Introduce timeout configuration to AsyncSink
> ---
>
> Key: FLINK-35435
> URL: https://issues.apache.org/jira/browse/FLINK-35435
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot 
> 2024-05-24 at 12.06.20.png
>
>
> Implementation Ticket for:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink

2024-06-10 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer resolved FLINK-35435.
---
Resolution: Done

> [FLIP-451] Introduce timeout configuration to AsyncSink
> ---
>
> Key: FLINK-35435
> URL: https://issues.apache.org/jira/browse/FLINK-35435
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot 
> 2024-05-24 at 12.06.20.png
>
>
> Implementation Ticket for:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35435) [FLIP-451] Introduce timeout configuration to AsyncSink

2024-06-10 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35435:
-

Assignee: Ahmed Hamdy

> [FLIP-451] Introduce timeout configuration to AsyncSink
> ---
>
> Key: FLINK-35435
> URL: https://issues.apache.org/jira/browse/FLINK-35435
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: Screenshot 2024-05-24 at 11.06.30.png, Screenshot 
> 2024-05-24 at 12.06.20.png
>
>
> Implementation Ticket for:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35435] Add timeout Configuration to Async Sink [flink]

2024-06-10 Thread via GitHub


dannycranmer merged PR #24839:
URL: https://github.com/apache/flink/pull/24839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


dannycranmer commented on PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158665814

   There is a risk this will break user code depending on these libs, right? 
Given this is a bug, and they should not be using the shaded libs, I think this 
is ok, thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35559) Shading issue cause class conflict

2024-06-10 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35559:
-

Assignee: Aleksandr Pilipenko

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> 

Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-10 Thread via GitHub


lincoln-lil commented on code in PR #24889:
URL: https://github.com/apache/flink/pull/24889#discussion_r164828


##
docs/layouts/shortcodes/generated/optimizer_config_configuration.html:
##
@@ -65,6 +71,12 @@
 Enum
 When it is `TRY_RESOLVE`, the optimizer tries to resolve the 
correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog 
pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), 
Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only 
changelog pipeline. For updates, there are  three main NDU problems:1. 
Non-deterministic functions, include scalar, table, aggregate functions, both 
builtin and custom ones.2. LookupJoin on an evolving source3. 
Cdc-source carries metadata fields which are system columns, not belongs to the 
entity data itself.For the first step, the optimizer automatically 
enables the materialization for No.2(LookupJoin) if needed, and gives the 
detailed error message for No.1(Non-deterministic functions) and 
No.3(Cdc-source with metadata) which is relatively easier to solve by changing 
the SQL.Default value is `IGNORE`, the optimizer does no changes.
 Possible values:"TRY_RESOLVE""IGNORE"
 
+
+
table.optimizer.reuse-optimize-block-with-digest-enabled

Review Comment:
   Add 'Batch & Streaming' label.



##
docs/layouts/shortcodes/generated/optimizer_config_configuration.html:
##
@@ -125,5 +125,11 @@
 Boolean
 If set to true, it will merge projects when converting SqlNode 
to RelNode.Note: it is not recommended to turn on unless you are aware of 
possible side effects, such as causing the output of certain non-deterministic 
expressions to not meet expectations(see FLINK-20887).
 
+
+table.optimizer.union-all-as-breakpoint-enabled 
Streaming

Review Comment:
   Should be Batch & Streaming, see 
`BatchCommonSubGraphBasedOptimizer#doOptimize`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-10 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1633282273


##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   It was used in a mixed manner in the file, in some places `Type`, some other 
places `CompactingFileWriter.Type`. The class itself is (indirectly) 
implementing the `CompactingFileWriter` so IMO `Type` and the variable name is 
definitive enough, but TBH I do not have a strong opinion, personally I prefer 
shorter code if I have a choice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


z3d1k commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633275869


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633262253


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   Ok sounds good - we discussed offline and suggested changing the shading in 
`flink-connector-kinesis` to include the whole `flink-connector-aws-base` 
module instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   No strong opinion just asking. Compaction/compression normally adds postfix, 
why prefix here?



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   Are we calling this with `..`?



##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   When we remove the `CompactingFileWriter` prefix then `Type` itself is hard 
to read. I suggest either put back the prefix or rename it to something more 
meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633225150


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]

2024-06-10 Thread via GitHub


flinkbot commented on PR #24920:
URL: https://github.com/apache/flink/pull/24920#issuecomment-2158287262

   
   ## CI report:
   
   * 97c210a011dd34e0c19deffe746cb3e88b355ce2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]

2024-06-10 Thread via GitHub


WencongLiu opened a new pull request, #24920:
URL: https://github.com/apache/flink/pull/24920

   ## What is the purpose of the change
   
   _Support custom shuffle for lookup join_
   
   ## Verifying this change
   
   UT and ITs.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on PR #24891:
URL: https://github.com/apache/flink/pull/24891#issuecomment-2158262913

   @wForget thanks for your efforts!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM

2024-06-10 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi closed FLINK-35525.
-

> HDFS  delegation token fetched by custom DelegationTokenProvider is not 
> passed to Yarn AM
> -
>
> Key: FLINK-35525
> URL: https://issues.apache.org/jira/browse/FLINK-35525
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Zhen Wang
>Assignee: Zhen Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> I tried running flink with hadoop proxy user by disabling HadoopModuleFactory 
> and flink built-in token providers, and implementing a custom token provider.
> However, only the hdfs token obtained by hadoopfs provider was added in 
> YarnClusterDescriptor, which resulted in Yarn AM submission failure.
> Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35525) HDFS delegation token fetched by custom DelegationTokenProvider is not passed to Yarn AM

2024-06-10 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi resolved FLINK-35525.
---
Fix Version/s: 1.20.0
   Resolution: Fixed

74b100b on master

> HDFS  delegation token fetched by custom DelegationTokenProvider is not 
> passed to Yarn AM
> -
>
> Key: FLINK-35525
> URL: https://issues.apache.org/jira/browse/FLINK-35525
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Zhen Wang
>Assignee: Zhen Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> I tried running flink with hadoop proxy user by disabling HadoopModuleFactory 
> and flink built-in token providers, and implementing a custom token provider.
> However, only the hdfs token obtained by hadoopfs provider was added in 
> YarnClusterDescriptor, which resulted in Yarn AM submission failure.
> Discussion: https://github.com/apache/flink/pull/22009#issuecomment-2132676114



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi merged PR #24891:
URL: https://github.com/apache/flink/pull/24891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


ashangit commented on PR #833:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2158258410

   Updated the PR to take in account this 
[comment](https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17853606=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17853606)
 in the jira ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Nicolas Fraison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853647#comment-17853647
 ] 

Nicolas Fraison commented on FLINK-35489:
-

Thks [~mxm] for the feedback. I've updated the 
[PR|https://github.com/apache/flink-kubernetes-operator/pull/833/files] to not 
limit the metaspace size. 

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   I guess a leftover and should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008


##
docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html:
##


Review Comment:
   I guess this file is a leftover and should be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1633123484


##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+/** Tests that REST Client SSL Engine creation fails with bad SSL 
configuration. */
+@ParameterizedTest
+@MethodSource("parameters")
+void testRESTClientSSLBadTruststoreType(String sslProvider) {
+Configuration clientConfig = 
createRestSslConfigWithTrustStore(sslProvider);
+clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS");

Review Comment:
   Can we follow the pattern what we've already? I mean `bad-truststore-type` 
or something. This applies to all other places



##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+@ParameterizedTest
+@MethodSource("parameters")
+void testInternalSSLWrongKeystoreType(String sslProvider) {
+final Configuration config = 
createInternalSslConfigWithKeyAndTrustStores(sslProvider);
+config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS");
+
+assertThatThrownBy(() -> 
SSLUtils.createInternalServerSSLEngineFactory(config))
+.isInstanceOf(java.io.IOException.class);
+
+assertThatThrownBy(() -> 
SSLUtils.createInternalClientSSLEngineFactory(config))
+.isInstanceOf(java.io.IOException.class);

Review Comment:
   Here too.



##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   What is the plan here with the wrapping? Couple of lines before I see the 
same wrapping pattern. It would be good to have a behavior like we've in Scala.



##
docs/layouts/shortcodes/generated/security_configuration.html:
##
@@ -134,6 +134,12 @@
 String
 The secret to decrypt the keystore file for Flink's for 
Flink's internal endpoints (rpc, data transport, blob server).
 
+
+security.ssl.internal.keystore-type
+"pkcs12"

Review Comment:
   This is JVM version dependent. Up until 1.8 `jks`, 1.9+ `pkcs12` + it can be 
overwritten with `keystore.type`, right? I would write here JVM detected 
default version.



##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -293,6 +315,19 @@ void testInternalSSLWrongTruststorePassword(String 
sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+@ParameterizedTest
+@MethodSource("parameters")
+void testInternalSSLWrongTruststoreType(String sslProvider) {
+final Configuration config = 
createInternalSslConfigWithKeyAndTrustStores(sslProvider);
+config.set(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_TYPE, "BKS");
+
+assertThatThrownBy(() -> 
SSLUtils.createInternalServerSSLEngineFactory(config))
+.isInstanceOf(java.security.KeyStoreException.class);

Review Comment:
   I've the feeling that `java.security.` can be eliminated with an import, 
right?



##
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java:
##
@@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) {
 .isInstanceOf(Exception.class);
 }
 
+@ParameterizedTest
+@MethodSource("parameters")
+void testInternalSSLWrongKeystoreType(String sslProvider) {
+

Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633185416


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl();
+static final MailboxExecutor.MailOptions DEFERRABLE = new 
MailOptionsImpl().setDeferrable();
+
+private boolean deferrable;

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633182865


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl();
+static final MailboxExecutor.MailOptions DEFERRABLE = new 
MailOptionsImpl().setDeferrable();
+
+private boolean deferrable;

Review Comment:
   ops, dunno why have I left it  this way.  Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35542) ClassNotFoundException when deserializing CheckpointedOffset

2024-06-10 Thread Jan Gurda (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jan Gurda updated FLINK-35542:
--
Affects Version/s: jdbc-3.2.0

> ClassNotFoundException when deserializing CheckpointedOffset
> 
>
> Key: FLINK-35542
> URL: https://issues.apache.org/jira/browse/FLINK-35542
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0, jdbc-3.1.2
> Environment: Flink 1.19.0
> Flink JDBC Connector 3.2-SNAPSHOT (commit 
> 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
> JDK 11 (Temurin)
>Reporter: Jan Gurda
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.3.0
>
>
> I use the latest flink-connector-jdbc code from the main branch, it's 
> actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
>  
> When jobs get interrupted while reading data from the JDBC source (for 
> example, by the TaskManager outage), they cannot recover due to the following 
> exception:
> {code:java}
> java.lang.RuntimeException: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
>     at 
> org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
>     at 
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
>     at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
> Source)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
> Source)
>     at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Unknown Source)
>     at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at 

Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633159087


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java:
##
@@ -86,6 +87,25 @@ public interface MailboxExecutor {
 /** A constant for empty args to save on object allocation. */
 Object[] EMPTY_ARGS = new Object[0];
 
+/** Extra options to configure enqueued mails. */
+@Experimental
+interface MailOptions {
+static MailOptions options() {
+return new MailOptionsImpl();

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


rkhachatryan commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1633154332


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+static final MailboxExecutor.MailOptions DEFAULT = new MailOptionsImpl();
+static final MailboxExecutor.MailOptions DEFERRABLE = new 
MailOptionsImpl().setDeferrable();
+
+private boolean deferrable;

Review Comment:
   Can we make this field immutable now?
   And make the constructor private?



##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java:
##
@@ -86,6 +87,25 @@ public interface MailboxExecutor {
 /** A constant for empty args to save on object allocation. */
 Object[] EMPTY_ARGS = new Object[0];
 
+/** Extra options to configure enqueued mails. */
+@Experimental
+interface MailOptions {

Review Comment:
   I'm not sure if hiding getters makes sense (users might be confused and even 
need them for testing). But that can be changed later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]

2024-06-10 Thread via GitHub


wForget commented on PR #24891:
URL: https://github.com/apache/flink/pull/24891#issuecomment-2158200824

   > Just seen the error in the pipeline, plz fix it:
   
   Thanks @gaborgsomogyi ,I have updated document and CI passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2158109164

   Please have a look at the failing test:
   ```
   Jun 10 07:17:02 07:17:02.965 [INFO] Running 
org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase
   Jun 10 07:17:03 07:17:03.593 [ERROR] Tests run: 1, Failures: 1, Errors: 0, 
Skipped: 0, Time elapsed: 0.623 s <<< FAILURE! -- in 
org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase
   Jun 10 07:17:03 07:17:03.593 [ERROR] 
org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testCompleteness
 -- Time elapsed: 0.593 s <<< FAILURE!
   Jun 10 07:17:03 java.lang.AssertionError: 
   Jun 10 07:17:03 Documentation is outdated, please regenerate it according to 
the instructions in flink-docs/README.md.
   Jun 10 07:17:03  Problems:
   Jun 10 07:17:03  Documentation of 
security.ssl.rest.truststore-type in SecurityOptions is outdated. Expected: 
default=("jks") description=(The type of the truststore for Flink's external 
REST endpoints.).
   Jun 10 07:17:03  Documentation of 
security.ssl.rest.keystore-type in SecurityOptions is outdated. Expected: 
default=("jks") description=(The type of the keystore for Flink's external REST 
endpoints.).
   Jun 10 07:17:03  Documentation of 
security.ssl.internal.keystore-type in SecurityOptions is outdated. Expected: 
default=("jks") description=(The type of keystore for Flink's internal 
endpoints (rpc, data transport, blob server).).
   Jun 10 07:17:03  Documentation of 
security.ssl.internal.truststore-type in SecurityOptions is outdated. Expected: 
default=("jks") description=(The type of truststore for Flink's internal 
endpoints (rpc, data transport, blob server).).
   Jun 10 07:17:03  Documented option 
security.ssl.rest.truststore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.rest.truststore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.rest.keystore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.rest.keystore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.internal.keystore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.internal.keystore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.internal.truststore-type does not exist.
   Jun 10 07:17:03  Documented option 
security.ssl.internal.truststore-type does not exist.
   Jun 10 07:17:03  at 
org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:209)
   Jun 10 07:17:03  at 
org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testCompleteness(ConfigOptionsDocsCompletenessITCase.java:70)
   Jun 10 07:17:03  at java.lang.reflect.Method.invoke(Method.java:498)
   Jun 10 07:17:03  at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
   Jun 10 07:17:03  at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
   Jun 10 07:17:03  at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
   Jun 10 07:17:03  at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
   Jun 10 07:17:03  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-10 Thread via GitHub


hlteoh37 commented on PR #745:
URL: https://github.com/apache/flink-web/pull/745#issuecomment-2158002413

   Thanks @1996fanrui  - great comments! I've addressed them.
   
   I've removed the below JIRAs from the release notes as they related to build 
infrastructure:
   
   ### Subtask
   - [FLINK-33816](https://issues.apache.org/jira/browse/FLINK-33816) - 
SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async 
checkpoint triggering not being completed 
   - [FLINK-34707](https://issues.apache.org/jira/browse/FLINK-34707) - 
Update japicmp configuration
   - [FLINK-34712](https://issues.apache.org/jira/browse/FLINK-34712) - 
Update reference data for Migration Tests
   
   ### Bug
   - [FLINK-26515](https://issues.apache.org/jira/browse/FLINK-26515) - 
RetryingExecutorTest. testDiscardOnTimeout failed on azure
   - [FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114) - 
TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
result mismatch 
   - [FLINK-31472](https://issues.apache.org/jira/browse/FLINK-31472) - 
AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
   - [FLINK-34324](https://issues.apache.org/jira/browse/FLINK-34324) - 
s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced
   - [FLINK-34569](https://issues.apache.org/jira/browse/FLINK-34569) - 
Streaming File Sink s3 end-to-end test failed
   - [FLINK-34571](https://issues.apache.org/jira/browse/FLINK-34571) - 
SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError failed 
due an assertion
   - [FLINK-34617](https://issues.apache.org/jira/browse/FLINK-34617) - 
Correct the Javadoc of org.apache.flink.api.common.time.Time
   - [FLINK-34622](https://issues.apache.org/jira/browse/FLINK-34622) - 
Typo of execution_mode configuration name in Chinese document
   - [FLINK-34933](https://issues.apache.org/jira/browse/FLINK-34933) - 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
 isnt implemented properly
   - [FLINK-35000](https://issues.apache.org/jira/browse/FLINK-35000) - 
PullRequest template doesnt use the correct format to refer to the testing 
code convention
   - [FLINK-35383](https://issues.apache.org/jira/browse/FLINK-35383) - 
Update compatibility matrix to include 1.19 release
   - [FLINK-35526](https://issues.apache.org/jira/browse/FLINK-35526) - 
Remove deprecated stedolan/jq Docker image from Flink e2e tests
   
   ### Technical Debt
   - [FLINK-34409](https://issues.apache.org/jira/browse/FLINK-34409) - 
Increase test coverage for AdaptiveScheduler
   - [FLINK-34897](https://issues.apache.org/jira/browse/FLINK-34897) - 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 needs to be enabled again
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #745:
URL: https://github.com/apache/flink-web/pull/745#discussion_r1633036672


##
docs/content/posts/2024-06-14-release-1.19.1.md:
##
@@ -0,0 +1,171 @@
+---
+title:  "Apache Flink 1.19.1 Release Announcement"
+date: "2024-06-14T00:00:00.000Z"
+aliases:
+- /news/2024/06/14/release-1.19.1.html
+authors:
+- hong:
+  name: "Hong"
+  twitter: "hlteoh2"
+---
+
+The Apache Flink Community is pleased to announce the first bug fix release of 
the Flink 1.19 series.
+
+This release includes 44 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.19.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:

Review Comment:
   Removed the below JIRAs from release notes. Will add as comment on PR
   
   ### Subtask
   - [FLINK-33816](https://issues.apache.org/jira/browse/FLINK-33816) - 
SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async 
checkpoint triggering not being completed 
   - [FLINK-34707](https://issues.apache.org/jira/browse/FLINK-34707) - 
Update japicmp configuration
   - [FLINK-34712](https://issues.apache.org/jira/browse/FLINK-34712) - 
Update reference data for Migration Tests
   
   ### Bug
   - [FLINK-26515](https://issues.apache.org/jira/browse/FLINK-26515) - 
RetryingExecutorTest. testDiscardOnTimeout failed on azure
   - [FLINK-29114](https://issues.apache.org/jira/browse/FLINK-29114) - 
TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
result mismatch 
   - [FLINK-31472](https://issues.apache.org/jira/browse/FLINK-31472) - 
AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
   - [FLINK-34324](https://issues.apache.org/jira/browse/FLINK-34324) - 
s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced
   - [FLINK-34569](https://issues.apache.org/jira/browse/FLINK-34569) - 
Streaming File Sink s3 end-to-end test failed
   - [FLINK-34571](https://issues.apache.org/jira/browse/FLINK-34571) - 
SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError failed 
due an assertion
   - [FLINK-34617](https://issues.apache.org/jira/browse/FLINK-34617) - 
Correct the Javadoc of org.apache.flink.api.common.time.Time
   - [FLINK-34622](https://issues.apache.org/jira/browse/FLINK-34622) - 
Typo of execution_mode configuration name in Chinese document
   - [FLINK-34933](https://issues.apache.org/jira/browse/FLINK-34933) - 
JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
 isnt implemented properly
   - [FLINK-35000](https://issues.apache.org/jira/browse/FLINK-35000) - 
PullRequest template doesnt use the correct format to refer to the testing 
code convention
   - [FLINK-35383](https://issues.apache.org/jira/browse/FLINK-35383) - 
Update compatibility matrix to include 1.19 release
   - [FLINK-35526](https://issues.apache.org/jira/browse/FLINK-35526) - 
Remove deprecated stedolan/jq Docker image from Flink e2e tests
   
   ### Technical Debt
   - [FLINK-34409](https://issues.apache.org/jira/browse/FLINK-34409) - 
Increase test coverage for AdaptiveScheduler
   - [FLINK-34897](https://issues.apache.org/jira/browse/FLINK-34897) - 
JobMasterServiceLeadershipRunnerTest#testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
 needs to be enabled again
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #745:
URL: https://github.com/apache/flink-web/pull/745#discussion_r1633022942


##
docs/content/posts/2024-06-14-release-1.19.1.md:
##
@@ -0,0 +1,171 @@
+---
+title:  "Apache Flink 1.19.1 Release Announcement"
+date: "2024-06-14T00:00:00.000Z"
+aliases:
+- /news/2024/06/14/release-1.19.1.html
+authors:
+- hong:
+  name: "Hong"
+  twitter: "hlteoh2"
+---
+
+The Apache Flink Community is pleased to announce the first bug fix release of 
the Flink 1.19 series.
+
+This release includes 44 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.19.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:
+[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354399).
+
+We highly recommend all users upgrade to Flink 1.19.1.
+
+# Release Artifacts
+
+## Maven Dependencies
+
+```xml
+
+  org.apache.flink
+  flink-java
+  1.19.1
+
+
+  org.apache.flink
+  flink-streaming-java
+  1.19.1
+
+
+  org.apache.flink
+  flink-clients
+  1.19.1
+
+```
+
+## Binaries
+
+You can find the binaries on the updated [Downloads page]({{< relref 
"downloads" >}}).
+
+## Docker Images
+
+* [library/flink](https://hub.docker.com/_/flink/tags?page=1=1.19.1) 
(official images)
+* 
[apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.19.1) 
(ASF repository)
+
+## PyPi
+
+* [apache-flink==1.19.1](https://pypi.org/project/apache-flink/1.19.1/)
+
+# Release Notes
+
+
+Release Notes - Flink - Version 1.19.1
+
+Sub-task
+
+
+[FLINK-33816] - 
SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
async checkpoint triggering not being completed 
+
+[FLINK-34707] - 
Update japicmp configuration
+
+[FLINK-34712] - 
Update reference data for Migration Tests
+
+
+
+Bug
+
+
+[FLINK-26515] - 
RetryingExecutorTest. testDiscardOnTimeout failed on azure
+
+[FLINK-26808] - 
[flink v1.14.2] Submit jobs via REST API not working after set 
web.submit.enable: false
+
+[FLINK-27741] - 
Fix NPE when use dense_rank() and rank() in over aggregation
+
+[FLINK-28693] - 
Codegen failed if the watermark is defined on a columnByExpression
+
+[FLINK-29114] - 
TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails 
with result mismatch 
+
+[FLINK-31223] - 
sql-client.sh fails to start with ssl enabled
+
+[FLINK-31472] - 
AsyncSinkWriterThrottlingTest failed with Illegal mailbox thread
+
+[FLINK-32513] - 
Job in BATCH mode with a significant number of transformations freezes on 
method StreamGraphGenerator.existsUnboundedSource()
+
+[FLINK-32828] - 
Partition aware watermark not handled correctly shortly after job start up 
from checkpoint or savepoint
+
+[FLINK-33798] - 
Automatically clean up rocksdb logs when the task failover.
+
+[FLINK-34324] - 
s3_setup is called in test_file_sink.sh even if the common_s3.sh is not 
sourced
+
+[FLINK-34379] - 
table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
+
+[FLINK-34517] - 
environment configs ignored when calling procedure operation
+
+[FLINK-34569] - 
Streaming File Sink s3 end-to-end test failed
+
+[FLINK-34571] - 
SortMergeResultPartitionReadSchedulerTest.testOnReadBufferRequestError 
failed due an assertion
+
+[FLINK-34616] - 
python dist doesnt clean when open method construct resource
+
+[FLINK-34617] - 
Correct the Javadoc of org.apache.flink.api.common.time.Time
+
+[FLINK-34622] - 
Typo of execution_mode configuration name in Chinese document
+
+[FLINK-34725] - 
Dockerfiles for release publishing has incorrect config.yaml path
+
+[FLINK-34933] - 

JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
 isnt implemented properly
+
+[FLINK-34956] - 
The config type is wrong for Duration
+
+[FLINK-35000] - 
PullRequest template doesnt use the correct format to refer to the 
testing code convention
+
+[FLINK-35089] - 
Two input AbstractStreamOperator may throw NPE when receiving 
RecordAttributes
+
+[FLINK-35097] - 
Table API Filesystem connector with raw format repeats last line
+
+[FLINK-35098] - 
Incorrect results for queries like 10 = y on tables using 
Filesystem connector and Orc format
+
+[FLINK-35112] - 
Membership for Row class does not include field names
+
+[FLINK-35159] - 
CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash
+
+[FLINK-35169] - 
Recycle buffers to freeSegments before releasing data buffer for sort 
accumulator
+
+[FLINK-35217] - 
Missing fsync in FileSystemCheckpointStorage
+
+[FLINK-35351] - 
Restore from unaligned checkpoints with a custom partitioner fails.
+
+[FLINK-35358] -   

Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #745:
URL: https://github.com/apache/flink-web/pull/745#discussion_r1633022433


##
docs/content/posts/2024-06-14-release-1.19.1.md:
##
@@ -0,0 +1,171 @@
+---
+title:  "Apache Flink 1.19.1 Release Announcement"
+date: "2024-06-14T00:00:00.000Z"
+aliases:
+- /news/2024/06/14/release-1.19.1.html
+authors:
+- hong:
+  name: "Hong"
+  twitter: "hlteoh2"
+---
+
+The Apache Flink Community is pleased to announce the first bug fix release of 
the Flink 1.19 series.
+
+This release includes 44 bug fixes, vulnerability fixes, and minor 
improvements for Flink 1.19.
+Below you will find a list of all bugfixes and improvements (excluding 
improvements to the build infrastructure and build stability). For a complete 
list of all changes see:
+[JIRA](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354399).
+
+We highly recommend all users upgrade to Flink 1.19.1.
+
+# Release Artifacts
+
+## Maven Dependencies
+
+```xml
+
+  org.apache.flink
+  flink-java
+  1.19.1
+
+
+  org.apache.flink
+  flink-streaming-java
+  1.19.1
+
+
+  org.apache.flink
+  flink-clients
+  1.19.1
+
+```
+
+## Binaries
+
+You can find the binaries on the updated [Downloads page]({{< relref 
"downloads" >}}).
+
+## Docker Images
+
+* [library/flink](https://hub.docker.com/_/flink/tags?page=1=1.19.1) 
(official images)
+* 
[apache/flink](https://hub.docker.com/r/apache/flink/tags?page=1=1.19.1) 
(ASF repository)
+
+## PyPi
+
+* [apache-flink==1.19.1](https://pypi.org/project/apache-flink/1.19.1/)
+
+# Release Notes
+
+
+Release Notes - Flink - Version 1.19.1
+
+Sub-task

Review Comment:
   Ok - removed. All subtasks are not relevant to users



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853606#comment-17853606
 ] 

Maximilian Michels commented on FLINK-35489:


Hey [~ashangit]! Good idea to switch the order between busgeting heap and 
metaspace, but metaspace should be allowed to grow and not be fixed. The reason 
is that metaspace is often configured too low. We can prevent metaspace related 
errors by detecting and increasing metaspace. 

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-06-10 Thread via GitHub


gyfora commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1632931104


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(
+v -> {
+Matcher matcher = 
partitionRegex.matcher(v);
+if (matcher.matches()) {
+String kafkaTopic = 
matcher.group("kafkaTopic");
+String kafkaId = 
matcher.group("kafkaId");
+String pulsarTopic =
+
matcher.group("pulsarTopic");
+String pulsarId = 
matcher.group("pulsarId");
+return kafkaTopic != null
+? kafkaTopic + "-" 
+ kafkaId
+: pulsarTopic + 
"-" + pulsarId;

Review Comment:
   @wenbingshen did you have a time to check this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1632887041


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java:
##
@@ -86,6 +87,25 @@ public interface MailboxExecutor {
 /** A constant for empty args to save on object allocation. */
 Object[] EMPTY_ARGS = new Object[0];
 
+/** Extra options to configure enqueued mails. */
+@Experimental
+interface MailOptions {
+static MailOptions options() {
+return new MailOptionsImpl();

Review Comment:
   I've went with factory patter for now. Can you take a look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED

2024-06-10 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853589#comment-17853589
 ] 

Hong Liang Teoh commented on FLINK-35556:
-

Changing fix version to 1.19.2

> Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
> ---
>
> Key: FLINK-35556
> URL: https://issues.apache.org/jira/browse/FLINK-35556
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.2
>
>
> See 
> https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35557:

Fix Version/s: 1.19.2
   (was: 1.19.1)

> MemoryManager only reserves memory per consumer type once
> -
>
> Key: FLINK-35557
> URL: https://issues.apache.org/jira/browse/FLINK-35557
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Runtime / Task
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.20.0, 1.19.2
>
>
> # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
> [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
>  a reserve function
>  # The function 
> [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
>  the available Slot memory and fails if there's not enough memory
>  # We pass it to {{SharedResources.getOrAllocateSharedResource}}
>  # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
> resource (memory) was already reserved by some key (e.g. 
> {{{}state-rocks-managed-memory{}}})
>  # If not, we create a new one and call the reserve function
>  # If the resource was already reserved (not null), we do NOT reserve the 
> memory again: 
> [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]
> So there will be only one (first) memory reservation for rocksdb for example, 
> no matter how many state backends in a slot are created. Meaning that managed 
> memory limits are not applied



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35557) MemoryManager only reserves memory per consumer type once

2024-06-10 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853588#comment-17853588
 ] 

Hong Liang Teoh commented on FLINK-35557:
-

Changing this to 1.19.2 release as this is a long-standing bug without a fix 
atm.

> MemoryManager only reserves memory per consumer type once
> -
>
> Key: FLINK-35557
> URL: https://issues.apache.org/jira/browse/FLINK-35557
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Runtime / Task
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.20.0, 1.19.2
>
>
> # In {{MemoryManager.getSharedMemoryResourceForManagedMemory}} we 
> [create|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java#L526]
>  a reserve function
>  # The function 
> [decrements|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java#L61]
>  the available Slot memory and fails if there's not enough memory
>  # We pass it to {{SharedResources.getOrAllocateSharedResource}}
>  # In {{SharedResources.getOrAllocateSharedResource}} , we check if the 
> resource (memory) was already reserved by some key (e.g. 
> {{{}state-rocks-managed-memory{}}})
>  # If not, we create a new one and call the reserve function
>  # If the resource was already reserved (not null), we do NOT reserve the 
> memory again: 
> [https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flin[…]/main/java/org/apache/flink/runtime/memory/SharedResources.java|https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-runtime/src/main/java/org/apache/flink/runtime/memory/SharedResources.java#L71]
> So there will be only one (first) memory reservation for rocksdb for example, 
> no matter how many state backends in a slot are created. Meaning that managed 
> memory limits are not applied



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35556) Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED

2024-06-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35556:

Fix Version/s: 1.19.2
   (was: 1.19.1)

> Wrong constant in RocksDBSharedResourcesFactory.SLOT_SHARED_MANAGED
> ---
>
> Key: FLINK-35556
> URL: https://issues.apache.org/jira/browse/FLINK-35556
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.2
>
>
> See 
> https://github.com/apache/flink/blob/57869c11687e0053a242c90623779c0c7336cd33/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResourcesFactory.java#L39



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34633) Support unnesting array constants

2024-06-10 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-34633:

Fix Version/s: 1.20.0
   (was: 1.19.1)

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34633) Support unnesting array constants

2024-06-10 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853587#comment-17853587
 ] 

Hong Liang Teoh commented on FLINK-34633:
-

Thanks for checking. Will remove the PR from the release notes

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24904:
URL: https://github.com/apache/flink/pull/24904#discussion_r1632838546


##
flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java:
##
@@ -86,6 +87,25 @@ public interface MailboxExecutor {
 /** A constant for empty args to save on object allocation. */
 Object[] EMPTY_ARGS = new Object[0];
 
+/** Extra options to configure enqueued mails. */
+@Experimental
+interface MailOptions {

Review Comment:
   Interface has only setters, as those are the only important bit for the 
user. Getters are in concrete internal class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


z3d1k commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1632814622


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   @hlteoh37 No, this package currently contains only util classes for working 
with AWS exceptions and not used for state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-10 Thread via GitHub


foxus commented on code in PR #142:
URL: 
https://github.com/apache/flink-connector-aws/pull/142#discussion_r1632803808


##
flink-connector-aws/flink-connector-kinesis/pom.xml:
##
@@ -354,6 +354,12 @@ under the License.
 
 
+
+
org.apache.flink.connector.aws.sink
+
+
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink
+

Review Comment:
   @hlteoh37 are you referring to the risk of restoring state that references 
FQCN that has now changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-10 Thread via GitHub


pnowojski commented on code in PR #24895:
URL: https://github.com/apache/flink/pull/24895#discussion_r1632799682


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -307,18 +307,36 @@ void onProcessingTime(long time) throws Exception {
 }
 
 public void advanceWatermark(long time) throws Exception {
-currentWatermark = time;
+Preconditions.checkState(
+tryAdvanceWatermark(
+time,
+() -> {
+// Never stop advancing.
+return false;
+}));
+}
 
+/**
+ * @return true if following watermarks can be processed immediately. 
False if the firing timers
+ * should be interrupted as soon as possible.
+ */
+public boolean tryAdvanceWatermark(
+long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+throws Exception {
+currentWatermark = time;
 InternalTimer timer;
-
 while ((timer = eventTimeTimersQueue.peek()) != null
 && timer.getTimestamp() <= time
 && !cancellationContext.isCancelled()) {
 keyContext.setCurrentKey(timer.getKey());
 eventTimeTimersQueue.poll();
 triggerTarget.onEventTime(timer);
 taskIOMetricGroup.getNumFiredTimers().inc();
+if (shouldStopAdvancingFn.shouldStopAdvancing()) {
+return false;
+}

Review Comment:
   Instead of `boolean first` I've added `boolean stop` and re-used it both for 
cancellation and interruption. I didn't want to make the `while()` condition 
any longer, as it would become too difficult to read IMO. 
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-10 Thread via GitHub


gaborgsomogyi commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2157658542

   @ammar-master thanks for this PR, looks quality code from first glance 
perspective. I'm going to shepherd it this week...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-10 Thread via GitHub


hlteoh37 commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632777604


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 

Review Comment:
   We have done it in tests previously to mitigate the "clutter" problem, and 
difficulty when creating the connector distribution jars.
   
https://github.com/apache/flink-connector-aws/tree/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples
   
   We could do something similar and point to them via README, OR we could add 
a maven profile that only adds the examples module for PR / nightlies



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >