Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440178966


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond(
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+if (!isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   @1996fanrui thanks, I'll add some comments and do as your suggestion 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440182777


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -217,14 +217,13 @@ private static double getNumRecordsInPerSecond(
 JobVertexID jobVertexID,
 boolean isSource) {
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
-numRecordsInPerSecond =
-
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
-}

Review Comment:
   Why the `SOURCE_TASK_NUM_RECORDS_IN_PER_SEC` part is removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440183532


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond(
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+if (!isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   @1996fanrui This time I have just removed the unused if condition and add 1 
comment.
   
   To be sure, let's wait feedback from @mxm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440186563


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -217,14 +217,13 @@ private static double getNumRecordsInPerSecond(
 JobVertexID jobVertexID,
 boolean isSource) {
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
-numRecordsInPerSecond =
-
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
-}

Review Comment:
   @1996fanrui  Sry, I have put it back and just add the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440186827


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -221,7 +221,7 @@ private static double getNumRecordsInPerSecond(
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
-if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+if (!isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   I have just add some comments to clarify this at last 🙏 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33966) Fix the getNumRecordsInPerSecond Utility Function

2024-01-03 Thread Yang LI (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802057#comment-17802057
 ] 

Yang LI commented on FLINK-33966:
-

Thanks to the clarification from [~fanrui] , now I think it's not a bug. 

> Fix the getNumRecordsInPerSecond Utility Function
> -
>
> Key: FLINK-33966
> URL: https://issues.apache.org/jira/browse/FLINK-33966
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Yang LI
>Assignee: Yang LI
>Priority: Minor
>  Labels: pull-request-available
>
> We have following code in the codebase
> {code:java}
> if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
>         }{code}
> {code:java}
>         if (isSource && (numRecordsInPerSecond == null || 
> numRecordsInPerSecond.getSum() == 0)) {
>             numRecordsInPerSecond =
>                     
> flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
>         }{code}
> with two times the same condition check 
>  
> {*}Definition of done{*}: 
> Update getNumRecordsInPerSecond'{{{}s{}}} second {{if}} condition from {{if 
> (isSource && ...)}} to {{{}if (!isSource && ...){}}}. This addresses the 
> redundant check and ensures correct metric fetching for non-source operators.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440197015


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,20 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC metric
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// If the vertex is the source, use SOURCE_TASK_NUM_RECORDS_IN_PER_SEC 
metric
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
+// If the vertex is the source and SOURCE_TASK_NUM_RECORDS_IN_PER_SEC 
metric not available
+// then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   ```suggestion
   // Generate numRecordsInPerSecond from 3 metrics:
   // 1. If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC 
metric
   var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
   // 2. If the vertex is the source, use 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC metric first.
   if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
   numRecordsInPerSecond =
   
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
   }
   // 3. If the vertex is the source and 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC metric not available
   // then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric
   if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
   numRecordsInPerSecond =
   
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
   }
   ```
   
   How about adding the `Generate numRecordsInPerSecond from 3 metrics:` and 
the `1. 2. 3.`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440202523


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,20 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC metric
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// If the vertex is the source, use SOURCE_TASK_NUM_RECORDS_IN_PER_SEC 
metric
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
+// If the vertex is the source and SOURCE_TASK_NUM_RECORDS_IN_PER_SEC 
metric not available
+// then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
 }

Review Comment:
   Good idea! 🙏 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1440197286


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.ResultSet;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/** A tool is used to build {@link JdbcSource} quickly. */
+@PublicEvolving
+public class JdbcSourceBuilder {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
+
+private final Configuration configuration;
+
+private int splitReaderFetchBatchSize;
+private int resultSetType;
+private int resultSetConcurrency;
+private int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private Boolean autoCommit;
+
+// TODO It would need a builder method to render after introducing 
streaming semantic.
+private DeliveryGuarantee deliveryGuarantee;
+
+private TypeInformation typeInformation;
+
+private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
+private String sql;
+private JdbcParameterValuesProvider jdbcParameterValuesProvider;
+private ResultExtractor resultExtractor;
+
+private JdbcConnectionProvider connectionProvider;
+
+JdbcSourceBuilder() {
+this.configuration = new Configuration();
+this.connOptionsBuilder = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
+this.splitReaderFetchBatchSize = 1024;
+this.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+this.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+this.deliveryGuarantee = DeliveryGuarantee.NONE;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+this.autoCommit = true;
+}
+
+public JdbcSourceBuilder setSql(@Nonnull String sql) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(sql), "It's required to 
set the sql.");
+this.sql = sql;
+return this;
+}
+
+public JdbcSourceBuilder setResultExtractor(ResultExtractor 
resultExtractor) {
+this.resultExtractor =
+Preconditions.checkNotNull(resultExtractor, "resultExtractor 
must not be null.");
+return this;
+}
+
+public JdbcSourceBuilder setUsername(String username) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(username),
+"It's required to set the 'username'.");
+connOptionsBuilder.withUsername(username);
+return this;
+}
+
+public JdbcSourceBuilder setPassword(String password) {
+connOptionsBuilder.withPassword(password);

[jira] [Commented] (FLINK-32241) UnsupportedFileSystemException when using the ABFS Hadoop driver for checkpointing in Flink 1.17

2024-01-03 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802077#comment-17802077
 ] 

Martijn Visser commented on FLINK-32241:


[~luoyuxia] Have you looked more into this topic?

> UnsupportedFileSystemException when using the ABFS Hadoop driver for 
> checkpointing in Flink 1.17
> 
>
> Key: FLINK-32241
> URL: https://issues.apache.org/jira/browse/FLINK-32241
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.17.1
> Environment: Flink 1.17.1
> Hadoop 3.3.4
> Flink Operator 1.4.0
> Kubernetes 1.24
>Reporter: Anton Ippolitov
>Priority: Minor
>
> https://issues.apache.org/jira/browse/HADOOP-18707 introduced a new 
> functionality in the ABFS Hadoop client which buffers data on local disk by 
> default.
> It looks like this breaks with Flink 1.17 in a scenario where:
>  * ABFS is used for checkpointing
>  * JobManager HA is enabled
>  * First JobManager leader dies and a stand-by JobManager takes over
> I can reliably reproduce this with Flink 1.17.1 running on Kubernetes by 
> simply killing the JM leader pod. Once the stand-by JobManager takes over, 
> all checkpoints consistently fail with the following error:
> {noformat}
> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize 
> checkpoint. at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1424)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1310)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1202)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
> "file" at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443) at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) at 
> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at 
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at 
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at 
> org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496) at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.(AbfsOutputStream.java:173)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:154)
>  at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
>  at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>  at 
> org.apache.flink.runtime.state.filesystem.FSDataOutputStreamWrapper.(FSDataOutputStreamWrapper.java:42)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:179)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointM

[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-03 Thread Zhanghao Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802080#comment-17802080
 ] 

Zhanghao Chen commented on FLINK-33940:
---

Thanks for the input, [~fanrui]. I'll take a look on the benchmark.

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
> Attachments: image-2024-01-03-10-52-05-861.png
>
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


boring-cyborg[bot] commented on PR #77:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/77#issuecomment-1875069060

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #77:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/77#issuecomment-1875070692

   @tisonkun PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33970) Add necessary checks for connector document

2024-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33970:
---
Labels: pull-request-available  (was: )

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33970] Add necessary checks for connector document [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #78:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/78#issuecomment-1875077994

   @leonardBang @tisonkun  PTAL. Currently, I just add simple scripts to check 
dead link instead of use hugo to build doc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33446) SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation doesn't produce the correct plan

2024-01-03 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802090#comment-17802090
 ] 

Jeyhun Karimov commented on FLINK-33446:


Hi [~fsk119] I scanned through the issue a bit. 

I can confirm that {{Sql2RelConverter}} generates the correct plan. The issue 
IMO is related to the {{RelDecorrelator.decorrelateQuery}} 
({{FlinkDecorrelateProgram::optimize}}), specifically this line:


{code:java}
val result = RelDecorrelator.decorrelateQuery(root)
{code}

the input plan ({{root}}) is:


{code:java}
LogicalProject(d2=[$0], d3=[$1])
  LogicalProject(d2=[$0], d3=[$1])
LogicalFilter(condition=[IS NULL($2)])
  LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0, 1}])
LogicalProject(d2=[+(2, $0)], d3=[+(3, $1)])
  LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
LogicalAggregate(group=[{0}])
  LogicalProject(i=[true])
LogicalProject(d1=[$0])
  LogicalFilter(condition=[AND(=($0, $cor0.d2), IS NULL($1))])
LogicalJoin(condition=[true], joinType=[left], 
variablesSet=[[$cor1, $cor0]])
  LogicalProject(d1=[+($0, 1)])
LogicalTableScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]])
  LogicalAggregate(group=[{0}])
LogicalProject(i=[true])
  LogicalFilter(condition=[AND(=($0, $cor1.d1), =($1, 
$cor1.d1), =(CAST($2):BIGINT, $cor0.d3))])
LogicalProject(d4=[+($0, 4)], d5=[+($0, 5)], d6=[+($0, 
6)])
  LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}


and the output of the function call ({{RelDecorrelator.decorrelateQuery}}) is:


{code:java}
LogicalProject(d2=[$0], d3=[$1], d4=[$4])
  LogicalFilter(condition=[IS NULL($5)])
LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
  LogicalProject(d2=[+(2, $0)], d3=[+(3, $1)])
LogicalTableScan(table=[[default_catalog, default_database, l, source: 
[TestTableSource(a, b, c)]]])
  LogicalProject(d11=[$0], $f3=[$1], d4=[$2], $f4=[true])
LogicalAggregate(group=[{0, 1, 2}])
  LogicalProject(d11=[$0], $f3=[$1], d4=[$2])
LogicalFilter(condition=[IS NULL($3)])
  LogicalJoin(condition=[true], joinType=[left])
LogicalFilter(condition=[IS NOT NULL($0)])
  LogicalProject(d1=[+($0, 1)])
LogicalTableScan(table=[[default_catalog, default_database, 
r, source: [TestTableSource(d, e, f)]]])
LogicalProject($f3=[$0], d4=[$1], $f2=[true])
  LogicalAggregate(group=[{0, 1}])
LogicalProject($f3=[$3], d4=[$0])
  LogicalFilter(condition=[AND(=($1, $0), 
=(CAST($2):BIGINT, $3))])
LogicalProject(d4=[+($0, 4)], d5=[+($0, 5)], d6=[+($0, 
6)], $f3=[CAST(+($0, 6)):BIGINT])
  LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])

{code}

WDYT?



> SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
>  doesn't produce the correct plan
> ---
>
> Key: FLINK-33446
> URL: https://issues.apache.org/jira/browse/FLINK-33446
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Shengkai Fang
>Priority: Major
>
> Although this test doesn't throw an exception, the final plan produces 3 
> columns rather than 2 after optimization.
> {code:java}
> LogicalProject(inputs=[0..1], exprs=[[$4]])
> +- LogicalFilter(condition=[IS NULL($5)])
>+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
>   :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>   :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>   +- LogicalProject(inputs=[0..2], exprs=[[true]])
>  +- LogicalAggregate(group=[{0, 1, 2}])
> +- LogicalProject(inputs=[0..2])
>+- LogicalFilter(condition=[IS NULL($3)])
>   +- LogicalJoin(condition=[true], joinType=[left])
>  :- LogicalFilter(condition=[IS NOT NULL($0)])
>  :  +- LogicalProject(exprs=[[+($0, 1)]])
>  : +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d, e, f)]]])
>  +- LogicalProject(inputs=[0..1], exprs=[[true]])
> +- LogicalAgg

Re: [PR] [FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #77:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/77#issuecomment-1875081879

   > Thank you! I'd appreciated it if you can backport to v4.1 branch also.
   
   @tisonkun Big thanks for your quick review. I want to add document check for 
connector document https://github.com/apache/flink-connector-pulsar/pull/78, 
Can you help me to review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP][FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #79:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/79#issuecomment-1875094367

   @tisonkun  PTAL, Backport to v4.1 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30702] Add Elasticsearch dialect [flink-connector-jdbc]

2024-01-03 Thread via GitHub


grzegorz8 commented on code in PR #67:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/67#discussion_r1440284283


##
flink-connector-jdbc/pom.xml:
##
@@ -38,10 +38,12 @@ under the License.
2.12
2.12.7
3.23.1
+   2.15.2
42.5.1
21.8.0.0
418
1.12.10
+   8.11.1

Review Comment:
   @MartijnVisser Both enforcer rule and statement about JDBC driver license 
have been added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-03 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1440284858


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   I am thinking that I should save the left right and operation in the 
visitor. I hope to construct a list of simple binary predicates (each with left 
right an operation) connected by any ORs. When this comes through to the 
processing where I need to construct the query, I will then know to only do 
character replacements in right hand sides (i.e not columns name or column name 
expressions). I hope to put something in for review this week.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32570][runtime][stream] Deprecates @Public/@PublicEvolving API that uses Flink's Time classes [flink]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #22984:
URL: https://github.com/apache/flink/pull/22984#discussion_r1440283481


##
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##
@@ -53,13 +54,14 @@ public class StateTtlConfig implements Serializable {
 private static final long serialVersionUID = -7592693245044289793L;
 
 public static final StateTtlConfig DISABLED =
-newBuilder(Time.milliseconds(Long.MAX_VALUE))
+newBuilder(Duration.ofMillis(Long.MAX_VALUE))
 .setUpdateType(UpdateType.Disabled)
 .build();
 
 /**
  * This option value configures when to update last access timestamp which 
prolongs state TTL.
  */
+@PublicEvolving

Review Comment:
   nit: The public class `StateTtlConfig` is `PublicEvolving`, do we need to 
mark all internal class as well?
   
   IIUC, all public internal class or fields are `PublicEvolving` if public 
class is `PublicEvolving`, right?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -810,7 +811,8 @@ public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
  * @param numberOfExecutionRetries The number of times the system will try 
to re-execute failed
  * tasks.
  * @deprecated This method will be replaced by {@link 
#setRestartStrategy}. The {@link
- * RestartStrategies#fixedDelayRestart(int, Time)} contains the number 
of execution retries.
+ * RestartStrategies#fixedDelayRestart(int, Duration)} contains the 
number of execution
+ * retries.

Review Comment:
   Not needed as well



##
flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java:
##
@@ -72,9 +72,23 @@ public static RestartStrategyConfiguration fixedDelayRestart(
  * @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
  * @param delayInterval Delay in-between restart attempts for the 
FixedDelayRestartStrategy
  * @return FixedDelayRestartStrategy
+ * @deprecated Use {@link #fixedDelayRestart(int, Duration)}
  */
+@Deprecated

Review Comment:
   IIUC, we don't need to change this class. 
   
   The whole `RestartStrategies` class has been marked as `Deprecated` in the 
master branch.



##
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java:
##
@@ -121,6 +125,10 @@ public StateVisibility getStateVisibility() {
 
 @Nonnull

Review Comment:
   ```suggestion
   /** @deprecated Use {@link #getTimeToLive()} */
   @Deprecated
   @Nonnull
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32570) Deprecate API that uses Flink's Time implementation (related to FLINK-14638)

2024-01-03 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32570:

Parent: FLINK-3957
Issue Type: Sub-task  (was: Technical Debt)

> Deprecate API that uses Flink's Time implementation (related to FLINK-14638)
> 
>
> Key: FLINK-32570
> URL: https://issues.apache.org/jira/browse/FLINK-32570
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> The plan is to resolve FLINK-14038 with Flink 2.0. As a preparation, we have 
> to deprecate related @Public API .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33365] include filters with Lookup joins [flink-connector-jdbc]

2024-01-03 Thread via GitHub


davidradl commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/79#discussion_r1440284858


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java:
##
@@ -96,28 +97,115 @@ public JdbcDynamicTableSource(
 public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
 // JDBC only support non-nested look up keys
 String[] keyNames = new String[context.getKeys().length];
+
 for (int i = 0; i < keyNames.length; i++) {
 int[] innerKeyArr = context.getKeys()[i];
 Preconditions.checkArgument(
 innerKeyArr.length == 1, "JDBC only support non-nested 
look up keys");
 keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
 }
+
 final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
+
+String[] conditions = null;
+
+if (this.resolvedPredicates != null) {
+conditions = new String[this.resolvedPredicates.size()];
+int processedPushdownParamsIndex = 0;
+for (int i = 0; i < this.resolvedPredicates.size(); i++) {
+String resolvedPredicate = this.resolvedPredicates.get(i);
+
+/*
+ * This replace seems like it should be using a Flink class to 
resolve the parameter. It does not
+ * effect the dialects as the placeholder comes from 
JdbcFilterPushdownPreparedStatementVisitor.
+ *
+ * Here is what has been considered as alternatives.
+ *
+ * We cannot use the way this is done in 
getScanRuntimeProvider, as the index we have is the index
+ * into the filters, but it needs the index into the fields. 
For example one lookup key and one filter
+ * would both have an index of 0, which the subsequent code 
would incorrectly resolve to the first
+ * field.
+ * We cannot use the PreparedStatement as we have not got 
access to the statement here.
+ * We cannot use ParameterizedPredicate as it takes the filter 
expression as input (e.g EQUALS(...)
+ * not the form we have here an example would be ('field1'= ?).
+ *
+ * An entry in the resolvedPredicates list may have more than 
one associated pushdown parameter, for example
+ * a query like this : ... on e.type = 2 and (e.age = 50 OR 
height > 90)  and a.ip = e.ip;
+ * will have 2 resolvedPredicates and 3 pushdownParams. The 
2nd and 3rd pushdownParams will be for the second
+ * resolvedPredicate.
+ *
+ */
+ArrayList paramsForThisPredicate = new ArrayList();
+char placeholderChar =
+
JdbcFilterPushdownPreparedStatementVisitor.PUSHDOWN_PREDICATE_PLACEHOLDER
+.charAt(0);
+
+int count =
+(int) resolvedPredicate.chars().filter(ch -> ch == 
placeholderChar).count();
+
+for (int j = processedPushdownParamsIndex;
+j < processedPushdownParamsIndex + count;
+j++) {
+
paramsForThisPredicate.add(this.pushdownParams[j].toString());
+}
+processedPushdownParamsIndex = processedPushdownParamsIndex + 
count;

Review Comment:
   I am thinking that I should save the left right and operation in the 
visitor. I hope to construct a list of simple binary predicates (each with left 
right an operation) connected by any ORs. When this comes through to the 
processing where I need to construct the query, I will then know to only do 
character replacements in right hand sides (i.e not columns name or column name 
expressions). I am investigating whether this is feasible and if so hope to put 
something in for review this week.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440293457


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##
@@ -42,7 +42,7 @@ public class TestingEventCollector> eventMap = new 
ConcurrentHashMap<>();
 
 @Override
-public void handleEvent(
+public synchronized void handleEvent(

Review Comment:
   Is there a way to make this method thread-safe without synchronizing on its 
entirety?



##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##
@@ -42,7 +42,7 @@ public class TestingEventCollector> eventMap = new 
ConcurrentHashMap<>();
 
 @Override
-public void handleEvent(
+public synchronized void handleEvent(

Review Comment:
   Maybe using `Collections.synchronizedList(..)`?



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ -38,6 +38,12 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
 .withDeprecatedKeys("scalingInterval")
 .withDescription("The interval of autoscaler standalone 
control loop.");
 
+public static final ConfigOption CONTROL_LOOP_PARALLELISM =
+autoscalerStandaloneConfig("control-loop.parallelism")
+.intType()
+.defaultValue(100)

Review Comment:
   FYI, the default for the Kubernetes operator is 200 but those two do not 
have to be aligned.



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor eventHandler;
 private final JobAutoScaler autoScaler;
 private final ScheduledExecutorService scheduledExecutorService;
+private final ExecutorService scalingThreadPool;
 
 public StandaloneAutoscalerExecutor(
-@Nonnull Duration scalingInterval,
+@Nonnull Configuration conf,
 @Nonnull JobListFetcher jobListFetcher,
 @Nonnull AutoScalerEventHandler eventHandler,
 @Nonnull JobAutoScaler autoScaler) {
-this.scalingInterval = scalingInterval;
+this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
 this.jobListFetcher = jobListFetcher;
 this.eventHandler = eventHandler;
 this.autoScaler = autoScaler;
 this.scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
 .setDaemon(false)
 .build());
+
+int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+this.scalingThreadPool =
+new ThreadPoolExecutor(
+parallelism,
+parallelism,
+0L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   Why `*4`? Shouldn't `parallelism` be enough for the work queue?



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -75,29 +98,38 @@ public void start() {
 @Override
 public void close() {
 scheduledExecutorService.shutdownNow();
+scalingThreadPool.shutdownNow();
 }
 
 @VisibleForTesting
 protected void scaling() {
 LOG.info("Standalone autoscaler starts scaling.");
 try {
 var jobList = jobListFetcher.fetch();
+Collection> futures = new LinkedList<>();
 for (var jobContext : jobList) {
-try {
-autoScaler.scale(jobContext);
-} catch (Throwable e) {
-LOG.error("Error while scaling job", e);
-eventHandler.handleEvent(
-jobContext,
-AutoScalerEventHandler.Type.Warning,
-AUTOSCALER_ERROR,
-e.getMessage(),
-null,
-null);
-}
+futures.add(scalingThreadPool.submit(() -> 
scalingSingleJob(jobContext)));
+}
+for (Future future : futures) {
+future.get();
 }
 } catch (Throwable e) {
 LOG.error("Error while fetch job list.", e);
 }

Review Comment:
   ```suggestion
   } catch (Throwable e) {
   LOG.error("Error while executing autoscaling.", e);
   }
   ```
   

Re: [PR] [FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #77:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/77#issuecomment-1875183000

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440322610


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,21 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC 
metric
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// 2. If the vertex is the source, use 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC metric first.
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
+// 3. If the vertex is the source and 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC metric not available
+// then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric

Review Comment:
   ```suggestion
   // 3. If the vertex contains a source operator which does not emit 
input metrics, use output metrics instead.
   // then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,21 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC 
metric
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// 2. If the vertex is the source, use 
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC metric first.
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {

Review Comment:
   ```suggestion
   // 2. If the former is unavailable and the vertex contains a source 
operator, use the corresponding source operator metric.
   if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,21 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If the vertex is not the source, use NUM_RECORDS_IN_PER_SEC 
metric
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);

Review Comment:
   ```suggestion
   // 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task 
metric.
   var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440326118


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##
@@ -42,7 +42,7 @@ public class TestingEventCollector> eventMap = new 
ConcurrentHashMap<>();
 
 @Override
-public void handleEvent(
+public synchronized void handleEvent(

Review Comment:
   Some callers use the poll methods of LinkedList, these methods from Queue, 
so I refactor it to `public final Queue> events = new 
LinkedBlockingQueue<>();`.



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor eventHandler;
 private final JobAutoScaler autoScaler;
 private final ScheduledExecutorService scheduledExecutorService;
+private final ExecutorService scalingThreadPool;
 
 public StandaloneAutoscalerExecutor(
-@Nonnull Duration scalingInterval,
+@Nonnull Configuration conf,
 @Nonnull JobListFetcher jobListFetcher,
 @Nonnull AutoScalerEventHandler eventHandler,
 @Nonnull JobAutoScaler autoScaler) {
-this.scalingInterval = scalingInterval;
+this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
 this.jobListFetcher = jobListFetcher;
 this.eventHandler = eventHandler;
 this.autoScaler = autoScaler;
 this.scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
 .setDaemon(false)
 .build());
+
+int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+this.scalingThreadPool =
+new ThreadPoolExecutor(
+parallelism,
+parallelism,
+0L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   First of all, why this `scalingThreadPool` doesn't use unbounded 
`LinkedBlockingQueue`?
   
   My thought is if all threads are busy and `LinkedBlockingQueue` has a lot of 
tasks, we can execute `task` in control loop scheduler thread. So I choose the 
`new ThreadPoolExecutor.CallerRunsPolicy()`.
   
   > Why *4?
   
   It's not strict. My thought is scheduler thread only work when 
`LinkedBlockingQueue` has a lot of tasks. So parallelism * 3 or > 3 makes sense.
   
   ---
   
   Of course, if you think `scalingThreadPool` already has a lot of thread, and 
we don't need thejcontrol loop scheduler thread to execute any tasks. We can 
let control loop scheduler thread only submit tasks, and wait for all tasks are 
finished. It means we don't let control loop scheduler thread executes tasks.
   
   WDYT?



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -75,29 +98,38 @@ public void start() {
 @Override
 public void close() {
 scheduledExecutorService.shutdownNow();
+scalingThreadPool.shutdownNow();
 }
 
 @VisibleForTesting
 protected void scaling() {
 LOG.info("Standalone autoscaler starts scaling.");
 try {
 var jobList = jobListFetcher.fetch();
+Collection> futures = new LinkedList<>();
 for (var jobContext : jobList) {
-try {
-autoScaler.scale(jobContext);
-} catch (Throwable e) {
-LOG.error("Error while scaling job", e);
-eventHandler.handleEvent(
-jobContext,
-AutoScalerEventHandler.Type.Warning,
-AUTOSCALER_ERROR,
-e.getMessage(),
-null,
-null);
-}
+futures.add(scalingThreadPool.submit(() -> 
scalingSingleJob(jobContext)));
+}
+for (Future future : futures) {
+future.get();
 }
 } catch (Throwable e) {
 LOG.error("Error while fetch job list.", e);
 }

Review Comment:
   From current code, `scalingThreadPool` execute the `scalingSingleJob` logic, 
and `scalingSingleJob` catches all exceptions. So I think all exceptions of 
control loop thread are from `fetch job list`.
   
   WDYT?



##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##
@@ 

Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440340673


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor eventHandler;
 private final JobAutoScaler autoScaler;
 private final ScheduledExecutorService scheduledExecutorService;
+private final ExecutorService scalingThreadPool;
 
 public StandaloneAutoscalerExecutor(
-@Nonnull Duration scalingInterval,
+@Nonnull Configuration conf,
 @Nonnull JobListFetcher jobListFetcher,
 @Nonnull AutoScalerEventHandler eventHandler,
 @Nonnull JobAutoScaler autoScaler) {
-this.scalingInterval = scalingInterval;
+this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
 this.jobListFetcher = jobListFetcher;
 this.eventHandler = eventHandler;
 this.autoScaler = autoScaler;
 this.scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
 .setDaemon(false)
 .build());
+
+int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+this.scalingThreadPool =
+new ThreadPoolExecutor(
+parallelism,
+parallelism,
+0L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   What is the benefit of the queue? If we are waiting on all tasks to finish, 
then we do not need the queue.
   
   I think the current logic of waiting for all tasks to finish is a bit 
fragile. For example, when we are executing a scaling decision, we may block 
for a long time on savepointing which will halt the next scaling for all 
pipelines. We could alternatively not block all but only the specific job for 
the next scaling. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun merged PR #77:
URL: https://github.com/apache/flink-connector-pulsar/pull/77


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33446) SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation doesn't produce the correct plan

2024-01-03 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802129#comment-17802129
 ] 

Shengkai Fang commented on FLINK-33446:
---

?? {{Sql2RelConverter}} generates the correct plan??

Yes I think you are right. I think it's calcite's bug and we need to upgrade 
calcite version.

> SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation
>  doesn't produce the correct plan
> ---
>
> Key: FLINK-33446
> URL: https://issues.apache.org/jira/browse/FLINK-33446
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Shengkai Fang
>Priority: Major
>
> Although this test doesn't throw an exception, the final plan produces 3 
> columns rather than 2 after optimization.
> {code:java}
> LogicalProject(inputs=[0..1], exprs=[[$4]])
> +- LogicalFilter(condition=[IS NULL($5)])
>+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
>   :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>   :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>   +- LogicalProject(inputs=[0..2], exprs=[[true]])
>  +- LogicalAggregate(group=[{0, 1, 2}])
> +- LogicalProject(inputs=[0..2])
>+- LogicalFilter(condition=[IS NULL($3)])
>   +- LogicalJoin(condition=[true], joinType=[left])
>  :- LogicalFilter(condition=[IS NOT NULL($0)])
>  :  +- LogicalProject(exprs=[[+($0, 1)]])
>  : +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d, e, f)]]])
>  +- LogicalProject(inputs=[0..1], exprs=[[true]])
> +- LogicalAggregate(group=[{0, 1}])
>+- LogicalProject(exprs=[[$3, $0]])
>   +- LogicalFilter(condition=[AND(=($1, $0), 
> =(CAST($2):BIGINT, $3))])
>  +- LogicalProject(exprs=[[+($0, 4), +($0, 
> 5), +($0, 6), CAST(+($0, 6)):BIGINT]])
> +- 
> LogicalTableScan(table=[[default_catalog, default_database, t, source: 
> [TestTableSource(i, j, k)]]])
> {code}
> After digging, I think it's the SubQueryRemoveRule doesn't generate the 
> Correlate but generates the Join node, which causes the failure of the 
> decorrelation. For a quick fix, I think we should throw an exception to 
> notify users it's not a supported feature in the Flink. 
> There might exist 2 ways to fix this issue:
> 1. Expand subquery when converting SQL to rel.  After experimenting with 
> calcite, I found that the Sql2RelConverter generates the correct plan.
> {code:java}
> LogicalProject(inputs=[0..1])
> +- LogicalFilter(condition=[IS NULL($2)])
>+- LogicalCorrelate(correlation=[$cor7], joinType=[left], 
> requiredColumns=[{0, 1}])
>   :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
>   :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
> source: [TestTableSource(a, b, c)]]])
>   +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
>  +- LogicalProject(exprs=[[true]])
> +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
>+- LogicalCorrelate(correlation=[$cor4], joinType=[left], 
> requiredColumns=[{0}])
>   :- LogicalProject(inputs=[0])
>   :  +- LogicalTableScan(table=[[default_catalog, 
> default_database, r, source: [TestTableSource(d1, e, f)]]])
>   +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
>  +- LogicalProject(exprs=[[true]])
> +- LogicalFilter(condition=[AND(=($0, $cor4.d1), 
> =($1, $cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
>+- LogicalProject(exprs=[[+($0, 4), +($0, 5), 
> +($0, 6)]])
>   +- LogicalTableScan(table=[[default_catalog, 
> default_database, t, source: [TestTableSource(i, j, k)]]])
> {code}
> You can find the new plan uses a correlate node rather than a join node.
> 2. CALCITE-5789 has fix this problem by removing the nested correlation node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33970] Add necessary checks for connector document [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun commented on PR #78:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/78#issuecomment-1875232119

   CI broken. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP][FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/79#discussion_r1440353899


##
.github/workflows/ci.yml:
##
@@ -50,3 +50,13 @@ jobs:
   flink_version: ${{ matrix.flink }}
   timeout_global: 120
   timeout_test: 80
+  CheckDeadLinks:

Review Comment:
   Please revert this change and do it in another patch. We may review #78 
first. Pleaase avoid squash commits in cherry-pick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440355436


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor eventHandler;
 private final JobAutoScaler autoScaler;
 private final ScheduledExecutorService scheduledExecutorService;
+private final ExecutorService scalingThreadPool;
 
 public StandaloneAutoscalerExecutor(
-@Nonnull Duration scalingInterval,
+@Nonnull Configuration conf,
 @Nonnull JobListFetcher jobListFetcher,
 @Nonnull AutoScalerEventHandler eventHandler,
 @Nonnull JobAutoScaler autoScaler) {
-this.scalingInterval = scalingInterval;
+this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
 this.jobListFetcher = jobListFetcher;
 this.eventHandler = eventHandler;
 this.autoScaler = autoScaler;
 this.scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
 .setDaemon(false)
 .build());
+
+int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+this.scalingThreadPool =
+new ThreadPoolExecutor(
+parallelism,
+parallelism,
+0L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   > What is the benefit of the queue? If we are waiting on all tasks to 
finish, then we do not need the queue.
   
   Sounds make sense, if we want to waiting, the queue isn't useful.
   
   > I think the current logic of waiting for all tasks to finish is a bit 
fragile. For example, when we are executing a scaling decision, we may block 
for a long time on savepointing which will halt the next scaling for all 
pipelines. We could alternatively not block all but only the specific job for 
the next scaling.
   
   Sorry, I didn't notice the `scalingSingleJob` will cost too many times. 
Currently, the scaling action only call the rescale api, so the thread won't be 
wait the savepointing.
   
   But I think your concern is reasonable. We define the `ScalingRealizer` 
interface, it means users can custom define the action. It might cost long time.
   
   
   
   
   Based on your suggestion, I think the `control loop scheduler thread` cannot 
wait all scaling. When the scaling of one job is slow, it shouldn't affect 
other jobs.
   
   Also, if a new scaling round is started, and the last scaling of some of 
jobs aren't finished. Should we ignore this round for these jobs? Some reasons 
might cause this issues:
   
   - The scaling action is slow
   - It has too many jobs, and the thread number isn't enough. So the last 
round always cannot be finished when the new round is started.
   
   So, I think we should ignore unfinished jobs in the new round.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33490) Validate the name conflicts when creating view

2024-01-03 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802136#comment-17802136
 ] 

Shengkai Fang commented on FLINK-33490:
---

Hi, [~martijnvisser]. I think this issue is just a bug fix rather than feature 
and the scope of the fix is clear(in previous discussions, this fix is just 
about aligning with the SQL standard and the behavior of other databases). 
Introducing a FLIP to fix a bug would not only consume a lot of our time, but 
it would also make the whole fixing process very lengthy. 

I think [~libenchao] 's proposal is just to sort out the current behavior 
supported by Flink SQL, to make Flink SQL easier for users to understand and 
more in line with SQL standards. However, this does not affect our bug fix, 
because the purpose of this bug fix itself is to make Flink SQL more standard.

WDYT? cc [~lincoln.86xy] [~xuyangzhong] [~libenchao] 

 

> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33972) Enhance and synchronize Sink API to match the Source API

2024-01-03 Thread Peter Vary (Jira)
Peter Vary created FLINK-33972:
--

 Summary: Enhance and synchronize Sink API to match the Source API
 Key: FLINK-33972
 URL: https://issues.apache.org/jira/browse/FLINK-33972
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Peter Vary


Umbrella jira for the implementation of 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33973) Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API

2024-01-03 Thread Peter Vary (Jira)
Peter Vary created FLINK-33973:
--

 Summary: Add new interfaces for SinkV2 to synchronize the API with 
the SourceV2 API
 Key: FLINK-33973
 URL: https://issues.apache.org/jira/browse/FLINK-33973
 Project: Flink
  Issue Type: Sub-task
Reporter: Peter Vary


Create the new interfaces, set inheritance and deprecation to finalize the 
interface.
After this change the new interafaces will exits, but they will not be 
functional.

The existing interfaces, and test should be working without issue, to verify 
that adding the API will be backward compatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33974) Implement the Sink transformation depending on the new SinkV2 interfaces

2024-01-03 Thread Peter Vary (Jira)
Peter Vary created FLINK-33974:
--

 Summary: Implement the Sink transformation depending on the new 
SinkV2 interfaces
 Key: FLINK-33974
 URL: https://issues.apache.org/jira/browse/FLINK-33974
 Project: Flink
  Issue Type: Sub-task
Reporter: Peter Vary


Implement the changes to the Sink transformation which should depend only on 
the new API interfaces. The tests should remain the same, to ensure backward 
compatibility.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440384519


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -75,29 +98,38 @@ public void start() {
 @Override
 public void close() {
 scheduledExecutorService.shutdownNow();
+scalingThreadPool.shutdownNow();
 }
 
 @VisibleForTesting
 protected void scaling() {
 LOG.info("Standalone autoscaler starts scaling.");
 try {
 var jobList = jobListFetcher.fetch();
+Collection> futures = new LinkedList<>();
 for (var jobContext : jobList) {
-try {
-autoScaler.scale(jobContext);
-} catch (Throwable e) {
-LOG.error("Error while scaling job", e);
-eventHandler.handleEvent(
-jobContext,
-AutoScalerEventHandler.Type.Warning,
-AUTOSCALER_ERROR,
-e.getMessage(),
-null,
-null);
-}
+futures.add(scalingThreadPool.submit(() -> 
scalingSingleJob(jobContext)));
+}
+for (Future future : futures) {
+future.get();
 }
 } catch (Throwable e) {
 LOG.error("Error while fetch job list.", e);
 }

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33975) Tests for the new Sink V2 transformations

2024-01-03 Thread Peter Vary (Jira)
Peter Vary created FLINK-33975:
--

 Summary: Tests for the new Sink V2 transformations
 Key: FLINK-33975
 URL: https://issues.apache.org/jira/browse/FLINK-33975
 Project: Flink
  Issue Type: Sub-task
Reporter: Peter Vary


Create new tests for the SinkV2 api transformations, and migrate some of the 
tests to use the new API. Some of the old test should be kept using the old API 
to make sure that the backward compatibility is tested until the deprecation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33976) AdaptiveScheduler cooldown period is taken from a wrong configuration

2024-01-03 Thread Jira
David Morávek created FLINK-33976:
-

 Summary: AdaptiveScheduler cooldown period is taken from a wrong 
configuration
 Key: FLINK-33976
 URL: https://issues.apache.org/jira/browse/FLINK-33976
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration, Runtime / Coordination
Reporter: David Morávek


The new JobManager options introduced in FLINK-21883: 
`scaling-interval.\{min,max}` of AdaptiveScheduler are resolved from the 
per-Job configuration instead of JobManager's configuration, which is not 
correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33976) AdaptiveScheduler cooldown period is taken from a wrong configuration

2024-01-03 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-33976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek reassigned FLINK-33976:
-

Assignee: David Morávek

> AdaptiveScheduler cooldown period is taken from a wrong configuration
> -
>
> Key: FLINK-33976
> URL: https://issues.apache.org/jira/browse/FLINK-33976
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / Coordination
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>
> The new JobManager options introduced in FLINK-21883: 
> `scaling-interval.\{min,max}` of AdaptiveScheduler are resolved from the 
> per-Job configuration instead of JobManager's configuration, which is not 
> correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33976) AdaptiveScheduler cooldown period is taken from a wrong configuration

2024-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33976:
---
Labels: pull-request-available  (was: )

> AdaptiveScheduler cooldown period is taken from a wrong configuration
> -
>
> Key: FLINK-33976
> URL: https://issues.apache.org/jira/browse/FLINK-33976
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / Coordination
>Reporter: David Morávek
>Assignee: David Morávek
>Priority: Major
>  Labels: pull-request-available
>
> The new JobManager options introduced in FLINK-21883: 
> `scaling-interval.\{min,max}` of AdaptiveScheduler are resolved from the 
> per-Job configuration instead of JobManager's configuration, which is not 
> correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33490) Validate the name conflicts when creating view

2024-01-03 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reopened FLINK-33490:
---

> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33976] AdaptiveScheduler cooldown period should be taken from JobManager's configuration [flink]

2024-01-03 Thread via GitHub


flinkbot commented on PR #24021:
URL: https://github.com/apache/flink/pull/24021#issuecomment-1875300830

   
   ## CI report:
   
   * 4cd91bc6028e0a5e78e4a7984201fa6eca84232f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33490) Validate the name conflicts when creating view

2024-01-03 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802146#comment-17802146
 ] 

Martijn Visser commented on FLINK-33490:


[~fsk119] I'm not suggesting to have a FLIP for a bugfix, I'm only trying to 
understand the process. If the feature and scope of the fix is clear, and it 
doesn't require breaking anything, then it's fine to go ahead and fix it. But 
in that case, FLINK-33740 is not needed and not a helpful ticket, since a FLIP 
isn't meant to list the supported capabilities. If we need to list the 
supported capabilities, we should just improve the documentation and that 
should be sufficient. 

> Validate the name conflicts when creating view
> --
>
> Key: FLINK-33490
> URL: https://issues.apache.org/jira/browse/FLINK-33490
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We should forbid 
> ```
> CREATE VIEW id_view AS
> SELECT id, uid AS id FROM id_table
> ```
> As the SQL standards states,
> If  is specified, then:
> i) If any two columns in the table specified by the  have 
> equivalent s, or if any column of that table has an 
> implementation-dependent name, then a  shall be specified.
> ii) Equivalent s shall not be specified more than once in the 
> .
> Many databases also throw exception when view name conflicts, e.g. mysql, 
> postgres.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-03 Thread Jufang He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802147#comment-17802147
 ] 

Jufang He commented on FLINK-33856:
---

[~pnowojski] It seems reasonable to use TraceReporter to report checkpoint 
related task-level metrics, could this issue be a subtask of FLIP-384?  WDYT

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API [flink]

2024-01-03 Thread via GitHub


pvary opened a new pull request, #24022:
URL: https://github.com/apache/flink/pull/24022

   ## What is the purpose of the change
   
   Add new SinkV2 API interfaces, set inheritance and deprecation for the old 
interfaces.
   
   ## Brief change log
   
   All the new API's defined in 
[FLIP-372](https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API)
   
   One minor additional change:
   - Added `map` to `CommittableSummary` and `CommittableWithLineage` so it 
will be easier to map when the types are changed.
   
   ## Verifying this change
   
   The existing tests should run without issue, to verify the backward 
compatibility.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33973) Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API

2024-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33973:
---
Labels: pull-request-available  (was: )

> Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API
> --
>
> Key: FLINK-33973
> URL: https://issues.apache.org/jira/browse/FLINK-33973
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Peter Vary
>Priority: Major
>  Labels: pull-request-available
>
> Create the new interfaces, set inheritance and deprecation to finalize the 
> interface.
> After this change the new interafaces will exits, but they will not be 
> functional.
> The existing interfaces, and test should be working without issue, to verify 
> that adding the API will be backward compatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the SourceV2 API [flink]

2024-01-03 Thread via GitHub


flinkbot commented on PR #24022:
URL: https://github.com/apache/flink/pull/24022#issuecomment-1875319278

   
   ## CI report:
   
   * 922c6c60c3b8d66f925d9bc84855b4beaaa90359 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [docs] Typo: add missing word "start" [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


gvauvert opened a new pull request, #745:
URL: https://github.com/apache/flink-kubernetes-operator/pull/745

   Typo: add missing word "start"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-01-03 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33977:
-

 Summary: Adaptive scheduler may not minimize the number of TMs 
during downscaling
 Key: FLINK-33977
 URL: https://issues.apache.org/jira/browse/FLINK-33977
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.18.0
Reporter: Zhanghao Chen


Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
groups. Currently, there're two implementations of SlotAssigner available: the 
DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based on 
the number of local key groups to utilize local state recovery. The scheduler 
will use the DefaultSlotAssigner when no key group assignment info is available 
and use the StateLocalitySlotAssigner otherwise.
 
However, none of the SlotAssigner targets at minimizing the number of TMs, 
which may produce suboptimal slot assignment under the Application Mode. For 
example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
downscaled through the FLIP-291 API to have 4 slot sharing groups instead, the 
cluster may still have 2 TMs, one with 1 free slot, and the other with 3 free 
slots. For end-users, this implies an ineffective downscaling as the total 
cluster resources are not reduced.
 
We should take minimizing number of TMs into consideration as well. A possible 
approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: when the 
number of free slots exceeds need, sort all the TMs by a score summing from the 
allocation scores of all slots on it, remove slots from the excessive TMs with 
the lowest score and proceed the remaining slot assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33977) Adaptive scheduler may not minimize the number of TMs during downscaling

2024-01-03 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-33977:
--
Component/s: Runtime / Coordination

> Adaptive scheduler may not minimize the number of TMs during downscaling
> 
>
> Key: FLINK-33977
> URL: https://issues.apache.org/jira/browse/FLINK-33977
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Zhanghao Chen
>Priority: Major
>
> Adaptive Scheduler uses SlotAssigner to assign free slots to slot sharing 
> groups. Currently, there're two implementations of SlotAssigner available: 
> the 
> DefaultSlotAssigner that treats all slots and slot sharing groups equally and 
> the {color:#172b4d}StateLocalitySlotAssigner{color} that assigns slots based 
> on the number of local key groups to utilize local state recovery. The 
> scheduler will use the DefaultSlotAssigner when no key group assignment info 
> is available and use the StateLocalitySlotAssigner otherwise.
>  
> However, none of the SlotAssigner targets at minimizing the number of TMs, 
> which may produce suboptimal slot assignment under the Application Mode. For 
> example, when a job with 8 slot sharing groups and 2 TMs (each 4 slots) is 
> downscaled through the FLIP-291 API to have 4 slot sharing groups instead, 
> the cluster may still have 2 TMs, one with 1 free slot, and the other with 3 
> free slots. For end-users, this implies an ineffective downscaling as the 
> total cluster resources are not reduced.
>  
> We should take minimizing number of TMs into consideration as well. A 
> possible approach is to enhance the {color:#172b4d}StateLocalitySlotAssigner: 
> when the number of free slots exceeds need, sort all the TMs by a score 
> summing from the allocation scores of all slots on it, remove slots from the 
> excessive TMs with the lowest score and proceed the remaining slot 
> assignment.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#issuecomment-1875354667

   > Looks good! I think this logic needs some comments because it is hard to 
understand the reasoning behind the logic. I added some additional suggestions.
   
   Thanks @mxm I have added your suggestion :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25421] Add JdbcSink with new format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


MartijnVisser commented on PR #2:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/2#issuecomment-1875367844

   @wanglijie95 Do you have bandwidth to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP][FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on code in PR #79:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/79#discussion_r1440472270


##
.github/workflows/ci.yml:
##
@@ -50,3 +50,13 @@ jobs:
   flink_version: ${{ matrix.flink }}
   timeout_global: 120
   timeout_test: 80
+  CheckDeadLinks:

Review Comment:
   @tisonkun I just check this branch with no dead link. I had drop this commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33976] AdaptiveScheduler cooldown period should be taken from JobManager's configuration [flink]

2024-01-03 Thread via GitHub


echauchot commented on PR #24021:
URL: https://github.com/apache/flink/pull/24021#issuecomment-1875401922

   Hi David, thanks for this PR, I'll take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33970] Add necessary checks for connector document [flink-connector-pulsar]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #78:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/78#issuecomment-1875406878

   > CI broken. PTAL.
   
   I had rebase to latest master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core package api-common [flink]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #23960:
URL: https://github.com/apache/flink/pull/23960#issuecomment-1875416513

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33814][autoscaler] Autoscaler Standalone control loop supports multiple threads [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440505443


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor eventHandler;
 private final JobAutoScaler autoScaler;
 private final ScheduledExecutorService scheduledExecutorService;
+private final ExecutorService scalingThreadPool;
 
 public StandaloneAutoscalerExecutor(
-@Nonnull Duration scalingInterval,
+@Nonnull Configuration conf,
 @Nonnull JobListFetcher jobListFetcher,
 @Nonnull AutoScalerEventHandler eventHandler,
 @Nonnull JobAutoScaler autoScaler) {
-this.scalingInterval = scalingInterval;
+this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
 this.jobListFetcher = jobListFetcher;
 this.eventHandler = eventHandler;
 this.autoScaler = autoScaler;
 this.scheduledExecutorService =
 Executors.newSingleThreadScheduledExecutor(
 new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
 .setDaemon(false)
 .build());
+
+int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+this.scalingThreadPool =
+new ThreadPoolExecutor(
+parallelism,
+parallelism,
+0L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   >Based on your suggestion, I think the control loop scheduler thread cannot 
wait all scaling. When the scaling of one job is slow, it shouldn't affect 
other jobs.
   
   Yes, I agree.
   
   >Also, if a new scaling round is started, and the last scaling of some of 
jobs aren't finished. Should we ignore this round for these jobs?
   
   Yes, that would be great. We just wait until they are ready. In the 
Kubernetes operator, there is currently no timeout for single jobs but we could 
add one in the future if this poses a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Typo: add missing word "start" [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on PR #745:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/745#issuecomment-1875460950

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Typo: add missing word "start" [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm merged PR #745:
URL: https://github.com/apache/flink-kubernetes-operator/pull/745


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


mxm commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440519499


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,21 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task 
metric.
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// 2. If the former is unavailable and the vertex contains a source 
operator, use the corresponding source operator metric.

Review Comment:
   We might have to run `mvn spotless:apply`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core package api-common [flink]

2024-01-03 Thread via GitHub


GOODBOY008 commented on PR #23960:
URL: https://github.com/apache/flink/pull/23960#issuecomment-1875472008

   > 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=55925&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=7264
   > 
   > The CI fails due to the change of this PR, right?
   
   @1996fanrui Yes, CI failure just fixed now. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


Yang-LI-CS commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1440549106


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,21 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task 
metric.
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// 2. If the former is unavailable and the vertex contains a source 
operator, use the corresponding source operator metric.

Review Comment:
   @mxm done! 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


pnowojski opened a new pull request, #24023:
URL: https://github.com/apache/flink/pull/24023

   This PR builds on top of https://github.com/apache/flink/pull/23908
   
   ## Brief change log
   
   Please check individual commit messages
   
   ## Verifying this change
   
   This change has been manually tested and added a unit test that metric has 
been created.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans

2024-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33697:
---
Labels: pull-request-available  (was: )

> FLIP-386: Support adding custom metrics in Recovery Spans
> -
>
> Key: FLINK-33697
> URL: https://issues.apache.org/jira/browse/FLINK-33697
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
>  The intention here is to add a capability for state backends to attach 
> custom attributes during recovery to recovery spans. For example 
> RocksDBIncrementalRestoreOperation could report both remote download time and 
> time to actually clip/ingest the RocksDB instances after rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


flinkbot commented on PR #24023:
URL: https://github.com/apache/flink/pull/24023#issuecomment-1875528953

   
   ## CI report:
   
   * e7b15627f0fe1823eb893fff103e8c0f715bc223 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33775] Report JobInitialization traces [flink]

2024-01-03 Thread via GitHub


StefanRRichter commented on code in PR #23908:
URL: https://github.com/apache/flink/pull/23908#discussion_r1440576635


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -155,7 +167,12 @@ public CheckpointStatsSnapshot createSnapshot() {
 counts.createSnapshot(),
 summary.createSnapshot(),
 history.createSnapshot(),
-latestRestoredCheckpoint);
+jobInitializationMetricsBuilder
+.map(
+JobInitializationMetricsBuilder
+
::buildRestoredCheckpointStats)
+.orElse(Optional.empty())
+.orElse(null));

Review Comment:
   Why 2x orElse?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -86,9 +88,10 @@ public class CheckpointStatsTracker {
 
 private final JobID jobID;
 private final MetricGroup metricGroup;
+private int totalNumberOfSubTasks;
 
-/** The latest restored checkpoint. */
-@Nullable private RestoredCheckpointStats latestRestoredCheckpoint;
+private Optional 
jobInitializationMetricsBuilder =

Review Comment:
   Why this? I think optional isn't even intended to be used for fields.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##
@@ -730,57 +739,87 @@ void restoreInternal() throws Exception {
 
getEnvironment().getMetricGroup().getIOMetricGroup().markTaskInitializationStarted();
 LOG.debug("Initializing {}.", getName());
 
-operatorChain =
-
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
-? new FinishedOperatorChain<>(this, recordWriter)
-: new RegularOperatorChain<>(this, recordWriter);
-mainOperator = operatorChain.getMainOperator();
+SubTaskInitializationMetricsBuilder initializationMetrics =
+new SubTaskInitializationMetricsBuilder(
+SystemClock.getInstance().absoluteTimeMillis());
+try {
+operatorChain =
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
+? new FinishedOperatorChain<>(this, recordWriter)
+: new RegularOperatorChain<>(this, recordWriter);
+mainOperator = operatorChain.getMainOperator();
 
-getEnvironment()
-.getTaskStateManager()
-.getRestoreCheckpointId()
-.ifPresent(restoreId -> latestReportCheckpointId = restoreId);
+getEnvironment()

Review Comment:
   Revert formatting changes in this file?



##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobInitializationMetricsBuilder.java:
##
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.SumMaxDuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.runtime.checkpoint.JobInitializationMetrics.UNSET;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+class JobInitializationMetricsBuilder {
+private static final Logger LOG =
+LoggerFactory.getLogger(JobInitializationMetricsBuilder.class);
+
+private final List reportedMetrics = new 
ArrayList<>();
+private final int totalNumberOfSubTasks;
+private final long startTs;
+private Optional stateSize = Optional.empty();

Review Comment:
   nit: Optional abused for field.



##
flink-runtime/src/main/java/org/apache/flink/

Re: [PR] [FLINK-33697][state][metrics] Trace RocksDBIncremental remote files download time [flink]

2024-01-03 Thread via GitHub


StefanRRichter commented on code in PR #24023:
URL: https://github.com/apache/flink/pull/24023#discussion_r1440607428


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackendParametersImpl.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.StateBackend.CustomInitializationMetrics;
+import org.apache.flink.runtime.state.StateBackend.KeyedStateBackendParameters;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * Internal POJO implementing {@link KeyedStateBackendParameters}
+ *
+ * @param 
+ */
+@Internal
+public class KeyedStateBackendParametersImpl implements 
KeyedStateBackendParameters {

Review Comment:
   What's the reason to have an interface for this class? It looks like a 
simple POJO that gives more meaning to a bunch of parameters. And there is only 
one impl.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java:
##
@@ -201,4 +278,75 @@ default boolean supportsNoClaimRestoreMode() {
 default boolean supportsSavepointFormat(SavepointFormatType formatType) {
 return formatType == SavepointFormatType.CANONICAL;
 }
+
+/**
+ * Parameters passed to {@link
+ * StateBackend#createKeyedStateBackend(KeyedStateBackendParameters)}.
+ *
+ * @param  The type of the keys by which the state is organized.
+ */
+@PublicEvolving
+interface KeyedStateBackendParameters {
+/** @return The runtime environment of the executing task. */
+Environment getEnv();
+
+JobID getJobID();
+
+String getOperatorIdentifier();
+
+TypeSerializer getKeySerializer();
+
+int getNumberOfKeyGroups();
+
+/** @return Range of key-groups for which the to-be-created backend is 
responsible. */
+KeyGroupRange getKeyGroupRange();
+
+TaskKvStateRegistry getKvStateRegistry();
+
+/** @return Provider for TTL logic to judge about state expiration. */
+TtlTimeProvider getTtlTimeProvider();
+
+MetricGroup getMetricGroup();
+
+@Nonnull
+Collection getStateHandles();
+
+/**
+ * @return The registry to which created closeable objects will be * 
registered during
+ * restore.
+ */
+CloseableRegistry getCancelStreamRegistry();
+
+double getManagedMemoryFraction();
+
+CustomInitializationMetrics getCustomInitializationMetrics();
+}
+
+/**
+ * Parameters passed to {@link
+ * 
StateBackend#createOperatorStateBackend(OperatorStateBackendParameters)}.
+ */
+@PublicEvolving
+interface OperatorStateBackendParameters {
+/** @return The runtime environment of the executing task. */
+Environment getEnv();

Review Comment:
   As said in a previous comment, not a fan of the extra interfaces, I'd simple 
turn them into classes. And then they could have a common superclass of env, 
operator id, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-20454][format][debezium] Allow to read metadata for debezium-avro-confluent format [flink]

2024-01-03 Thread via GitHub


MartijnVisser commented on PR #18744:
URL: https://github.com/apache/flink/pull/18744#issuecomment-1875613760

   If the PR gets rebased and the CI passes, someone could take a look most 
likely. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33970] Add necessary checks for connector document [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun merged PR #78:
URL: https://github.com/apache/flink-connector-pulsar/pull/78


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33970) Add necessary checks for connector document

2024-01-03 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-33970:
--
Fix Version/s: pulsar-4.2.0
   pulsar-4.1.1

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
> Fix For: pulsar-4.2.0, pulsar-4.1.1
>
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33970) Add necessary checks for connector document

2024-01-03 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen closed FLINK-33970.
-
Resolution: Fixed

master via 5b70da8f88e21057a5c590d139eab558f87e5dca

Thanks a lot [~gongzhongqiang]!

> Add necessary checks for connector document
> ---
>
> Key: FLINK-33970
> URL: https://issues.apache.org/jira/browse/FLINK-33970
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Assignee: Zhongqiang Gong
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-33964, we found the documentation files in independent connector 
> repos lacks basic checks like broken url, this ticket aims to add necessary 
> checks and avoid similar issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP][FLINK-33964][pulsar][docs] Remove dead link [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun merged PR #79:
URL: https://github.com/apache/flink-connector-pulsar/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33970] Add necessary checks for connector document [flink-connector-pulsar]

2024-01-03 Thread via GitHub


tisonkun commented on PR #78:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/78#issuecomment-1875647916

   Now we can pick this patch to v4.1 >_<


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Lap 1 Training [flink-training]

2024-01-03 Thread via GitHub


ness-senthilRamamoorthy opened a new pull request, #71:
URL: https://github.com/apache/flink-training/pull/71

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Lab 1 Training [flink-training]

2024-01-03 Thread via GitHub


ness-senthilRamamoorthy closed pull request #71:  Lab 1 Training
URL: https://github.com/apache/flink-training/pull/71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2024-01-03 Thread Samuel Brotherton (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802295#comment-17802295
 ] 

Samuel Brotherton commented on FLINK-32212:
---

We are seeing this issue as well; it breaks our pipelines whenever the 
JobManager pod gets redeployed due to a spot instance scale down, etc.

> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>
> After running for a few hours the job starts to throw IllegalStateException 
> and I can't figure out why. To restore the job, I need to manually delete the 
> FlinkDeployment to be recreated and redeploy everything.
> The jar is built-in into the docker image, hence is defined accordingly with 
> the Operator's documentation:
> {code:java}
> // jarURI: local:///opt/flink/usrlib/my-job.jar {code}
> I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work 
> either. 
>  
> {code:java}
> // Source: my-topic (1/2)#30587 
> (b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
> switched from DEPLOYING to FAILED with failure cause: 
> java.lang.IllegalStateException: The library registration references a 
> different set of library BLOBs than previous registrations for this job:
> old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
> new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
>     at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> If there is any other information that can help to identify the problem, 
> please let me know.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32212) Job restarting indefinitely after an IllegalStateException from BlobLibraryCacheManager

2024-01-03 Thread Samuel Brotherton (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802295#comment-17802295
 ] 

Samuel Brotherton edited comment on FLINK-32212 at 1/3/24 6:30 PM:
---

We are seeing this issue as well; it breaks our pipelines whenever the 
JobManager pod gets redeployed due to a spot instance scale down, etc.

As a workaround we are currently manually restarting with `kubectl rollout 
restart deployment/our-flink-deployment` which fixes temporarily.


was (Author: sbrother):
We are seeing this issue as well; it breaks our pipelines whenever the 
JobManager pod gets redeployed due to a spot instance scale down, etc.

> Job restarting indefinitely after an IllegalStateException from 
> BlobLibraryCacheManager
> ---
>
> Key: FLINK-32212
> URL: https://issues.apache.org/jira/browse/FLINK-32212
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Task
>Affects Versions: 1.16.1
> Environment: Apache Flink Kubernetes Operator 1.4
>Reporter: Matheus Felisberto
>Priority: Major
>
> After running for a few hours the job starts to throw IllegalStateException 
> and I can't figure out why. To restore the job, I need to manually delete the 
> FlinkDeployment to be recreated and redeploy everything.
> The jar is built-in into the docker image, hence is defined accordingly with 
> the Operator's documentation:
> {code:java}
> // jarURI: local:///opt/flink/usrlib/my-job.jar {code}
> I've tried to move it into /opt/flink/lib/my-job.jar but it didn't work 
> either. 
>  
> {code:java}
> // Source: my-topic (1/2)#30587 
> (b82d2c7f9696449a2d9f4dc298c0a008_bc764cd8ddf7a0cff126f51c16239658_0_30587) 
> switched from DEPLOYING to FAILED with failure cause: 
> java.lang.IllegalStateException: The library registration references a 
> different set of library BLOBs than previous registrations for this job:
> old:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-7237ecbb12b0b021934b0c81aef78396]
> new:[p-5d91888083d38a3ff0b6c350f05a3013632137c6-943737c6790a3ec6870cecd652b956c2]
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
>     at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
>     at 
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> If there is any other information that can help to identify the problem, 
> please let me know.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLIP 400] Introduces AsyncScalarFunction as a new UDF type [flink]

2024-01-03 Thread via GitHub


AlanConfluent commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1440888375


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSyncModeRule.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.trait.AsyncOperatorModeTrait;
+import org.apache.flink.table.planner.plan.utils.AsyncUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**

Review Comment:
   Removed



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/AsyncOperatorModeTrait.java:
##
@@ -0,0 +1,47 @@
+package org.apache.flink.table.planner.plan.trait;

Review Comment:
   Removed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33611) Support Large Protobuf Schemas

2024-01-03 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802335#comment-17802335
 ] 

Sai Sharath Dandi commented on FLINK-33611:
---

[~libenchao] All identifier names in the code are part of the constant pool 
including local variable names. You can use the javap tool on a simple class 
file to examine the constant pool contents - 
[ref|[https://blogs.oracle.com/javamagazine/post/java-class-file-constant-pool].]

 

Here's an example class and it's constant pool content obtained with javap - 

 

 
{code:java}
public class Hello {

  public void sayHello1() {
Integer a1;
int b;
String c;
  }

  public void sayHello2() {
Integer a2;
int b;
String c;
  }
} {code}
{code:java}
Constant pool:
   #1 = Methodref          #6.#25         // java/lang/Object."":()V
   #2 = Methodref          #26.#27        // 
java/lang/Integer.valueOf:(I)Ljava/lang/Integer;
   #3 = String             #28            // hi
   #4 = String             #29            // hello
   #5 = Class              #30            // 
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
   #6 = Class              #31            // java/lang/Object
   #7 = Utf8               
   #8 = Utf8               ()V
   #9 = Utf8               Code
  #10 = Utf8               LineNumberTable
  #11 = Utf8               LocalVariableTable
  #12 = Utf8               this
  #13 = Utf8               
Lcom/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello;
  #14 = Utf8               sayHello1
  #15 = Utf8               a1
  #16 = Utf8               Ljava/lang/Integer;
  #17 = Utf8               b
  #18 = Utf8               I
  #19 = Utf8               c
  #20 = Utf8               Ljava/lang/String;
  #21 = Utf8               sayHello2
  #22 = Utf8               a2
  #23 = Utf8               SourceFile
  #24 = Utf8               Hello.java
  #25 = NameAndType        #7:#8          // "":()V
  #26 = Class              #32            // java/lang/Integer
  #27 = NameAndType        #33:#34        // valueOf:(I)Ljava/lang/Integer;
  #28 = Utf8               hi
  #29 = Utf8               hello
  #30 = Utf8               
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
  #31 = Utf8               java/lang/Object
  #32 = Utf8               java/lang/Integer
  #33 = Utf8               valueOf
  #34 = Utf8               (I)Ljava/lang/Integer; {code}
 

 

As we can see from the above example, local variable names are part of the 
constant pool

 

> Support Large Protobuf Schemas
> --
>
> Key: FLINK-33611
> URL: https://issues.apache.org/jira/browse/FLINK-33611
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: Sai Sharath Dandi
>Assignee: Sai Sharath Dandi
>Priority: Major
>  Labels: pull-request-available
>
> h3. Background
> Flink serializes and deserializes protobuf format data by calling the decode 
> or encode method in GeneratedProtoToRow_XXX.java generated by codegen to 
> parse byte[] data into Protobuf Java objects. FLINK-32650 has introduced the 
> ability to split the generated code to improve the performance for large 
> Protobuf schemas. However, this is still not sufficient to support some 
> larger protobuf schemas as the generated code exceeds the java constant pool 
> size [limit|https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool] 
> and we can see errors like "Too many constants" when trying to compile the 
> generated code. 
> *Solution*
> Since we already have the split code functionality already introduced, the 
> main proposal here is to now reuse the variable names across different split 
> method scopes. This will greatly reduce the constant pool size. One more 
> optimization is to only split the last code segment also only when the size 
> exceeds split threshold limit. Currently, the last segment of the generated 
> code is always being split which can lead to too many split methods and thus 
> exceed the constant pool size limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33611) Support Large Protobuf Schemas

2024-01-03 Thread Sai Sharath Dandi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802335#comment-17802335
 ] 

Sai Sharath Dandi edited comment on FLINK-33611 at 1/3/24 10:15 PM:


[~libenchao] All identifier names in the code are part of the constant pool 
including local variable names. You can use the javap tool on a simple class 
file to examine the constant pool contents - 
[ref|[https://blogs.oracle.com/javamagazine/post/java-class-file-constant-pool].]

 

Here's an example class and it's constant pool content obtained with javap - 

 

 
{code:java}
public class Hello {

  public void sayHello1() {
Integer a1 = 1;
int b = 2;
String c = "hi";
  }

  public void sayHello2() {
Integer a2 = 3;
int b = 2;
String c = "hello";
  }
}{code}
{code:java}
Constant pool:
   #1 = Methodref          #6.#25         // java/lang/Object."":()V
   #2 = Methodref          #26.#27        // 
java/lang/Integer.valueOf:(I)Ljava/lang/Integer;
   #3 = String             #28            // hi
   #4 = String             #29            // hello
   #5 = Class              #30            // 
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
   #6 = Class              #31            // java/lang/Object
   #7 = Utf8               
   #8 = Utf8               ()V
   #9 = Utf8               Code
  #10 = Utf8               LineNumberTable
  #11 = Utf8               LocalVariableTable
  #12 = Utf8               this
  #13 = Utf8               
Lcom/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello;
  #14 = Utf8               sayHello1
  #15 = Utf8               a1
  #16 = Utf8               Ljava/lang/Integer;
  #17 = Utf8               b
  #18 = Utf8               I
  #19 = Utf8               c
  #20 = Utf8               Ljava/lang/String;
  #21 = Utf8               sayHello2
  #22 = Utf8               a2
  #23 = Utf8               SourceFile
  #24 = Utf8               Hello.java
  #25 = NameAndType        #7:#8          // "":()V
  #26 = Class              #32            // java/lang/Integer
  #27 = NameAndType        #33:#34        // valueOf:(I)Ljava/lang/Integer;
  #28 = Utf8               hi
  #29 = Utf8               hello
  #30 = Utf8               
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
  #31 = Utf8               java/lang/Object
  #32 = Utf8               java/lang/Integer
  #33 = Utf8               valueOf
  #34 = Utf8               (I)Ljava/lang/Integer; {code}
 

 

As we can see from the above example, local variable names are part of the 
constant pool

 


was (Author: JIRAUSER298466):
[~libenchao] All identifier names in the code are part of the constant pool 
including local variable names. You can use the javap tool on a simple class 
file to examine the constant pool contents - 
[ref|[https://blogs.oracle.com/javamagazine/post/java-class-file-constant-pool].]

 

Here's an example class and it's constant pool content obtained with javap - 

 

 
{code:java}
public class Hello {

  public void sayHello1() {
Integer a1;
int b;
String c;
  }

  public void sayHello2() {
Integer a2;
int b;
String c;
  }
} {code}
{code:java}
Constant pool:
   #1 = Methodref          #6.#25         // java/lang/Object."":()V
   #2 = Methodref          #26.#27        // 
java/lang/Integer.valueOf:(I)Ljava/lang/Integer;
   #3 = String             #28            // hi
   #4 = String             #29            // hello
   #5 = Class              #30            // 
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
   #6 = Class              #31            // java/lang/Object
   #7 = Utf8               
   #8 = Utf8               ()V
   #9 = Utf8               Code
  #10 = Utf8               LineNumberTable
  #11 = Utf8               LocalVariableTable
  #12 = Utf8               this
  #13 = Utf8               
Lcom/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello;
  #14 = Utf8               sayHello1
  #15 = Utf8               a1
  #16 = Utf8               Ljava/lang/Integer;
  #17 = Utf8               b
  #18 = Utf8               I
  #19 = Utf8               c
  #20 = Utf8               Ljava/lang/String;
  #21 = Utf8               sayHello2
  #22 = Utf8               a2
  #23 = Utf8               SourceFile
  #24 = Utf8               Hello.java
  #25 = NameAndType        #7:#8          // "":()V
  #26 = Class              #32            // java/lang/Integer
  #27 = NameAndType        #33:#34        // valueOf:(I)Ljava/lang/Integer;
  #28 = Utf8               hi
  #29 = Utf8               hello
  #30 = Utf8               
com/uber/athena/athenax/connector/kafka/formats/protobuf/deserialize/Hello
  #31 = Utf8               java/lang/Object
  #32 = Utf8               java/lang/Integer
  #33 = Utf8               valueOf
  #34 = Utf8        

[jira] [Created] (FLINK-33978) FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2024-01-03 Thread Alan Sheinberg (Jira)
Alan Sheinberg created FLINK-33978:
--

 Summary: FLIP-400: AsyncScalarFunction for asynchronous scalar 
function support
 Key: FLINK-33978
 URL: https://issues.apache.org/jira/browse/FLINK-33978
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Alan Sheinberg


https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33979) Implement restore tests for TableSink node

2024-01-03 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-33979:
---

 Summary: Implement restore tests for TableSink node
 Key: FLINK-33979
 URL: https://issues.apache.org/jira/browse/FLINK-33979
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33979] Implement restore tests for TableSink node [flink]

2024-01-03 Thread via GitHub


bvarghese1 opened a new pull request, #24024:
URL: https://github.com/apache/flink/pull/24024

   
   
   ## What is the purpose of the change
   
   *Add restore tests for TableSink node*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added restore tests for TableSink node which verifies the generated 
compiled plan with the saved compiled plan.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33979) Implement restore tests for TableSink node

2024-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33979:
---
Labels: pull-request-available  (was: )

> Implement restore tests for TableSink node
> --
>
> Key: FLINK-33979
> URL: https://issues.apache.org/jira/browse/FLINK-33979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Bonnie Varghese
>Assignee: Bonnie Varghese
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33979] Implement restore tests for TableSink node [flink]

2024-01-03 Thread via GitHub


flinkbot commented on PR #24024:
URL: https://github.com/apache/flink/pull/24024#issuecomment-1876189607

   
   ## CI report:
   
   * 20da3f4a949949c419f4e8f989312c43172ef4c5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.

2024-01-03 Thread MOBIN (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MOBIN updated FLINK-33971:
--
Description: 
Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: 
[[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]]

Sample code for a result table that supports dynamic columns

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='',
  'zookeeper.quorum'='',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
If dynamic.table is set to true, HBase table that supports dynamic columns is 
used.
Two fields must be declared in the rows that correspond to each column family. 
The value of the first field indicates the dynamic column, and the value of the 
second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the HBase table. f1:10 is 100, and f2:2020-7-26 is 1.

  was:
Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: 
[[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]|https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv]

Sample code for a result table that supports dynamic columns

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='',
  'zookeeper.quorum'='',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
If dynamic.table is set to true, HBase table that supports dynamic columns is 
used.
Two fields must be declared in the rows that correspond to each column family. 
The value of the first field indicates the dynamic column, and the value of the 
second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the ApsaraDB for HBase table. f1:10 is 100, and f2:2020-7-26 is 
1.


> Specifies whether to use HBase table that supports dynamic columns.
> ---
>
> Key: FLINK-33971
> URL: https://issues.apache.org/jira/browse/FLINK-33971
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / HBase
>Reporter: MOBIN
>Priority: Minor
>
> Specifies whether to use HBase table that supports dynamic columns.
> Refer to the dynamic.table parameter in this document: 
> [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]]
> Sample code for a result table that supports dynamic columns
> CREATE TEMPORARY TABLE datagen_source (
>   id INT,
>   f1hour STRING,
>   f1deal BIGINT,
>   f2day STRING,
>   f2deal BIGINT
> ) WITH (
>   'connector'='datagen'
> );
> CREATE TEMPORARY TABLE hbase_sink (
>   rowkey INT,
>   f1 ROW<`hour` STRING, deal BIGINT>,
>   f2 ROW<`day` STRING, deal BIGINT>
> ) WITH (
>   'connector'='hbase-2.2',
>   'table-name'='',
>   'zookeeper.quorum'='',
>   'dynamic.table'='true'
> );
> INSERT INTO hbase_sink
> SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
> If dynamic.table is set to true, HBase table that supports dynamic columns is 
> used.
> Two fields must be declared in the rows that correspond to each column 
> family. The value of the first field indicates the dynamic column, and the 
> value of the second field indicates the value of the dynamic column.
> For example, the datagen_source table contains a row of data The row of data 
> indicates that the ID of

Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-03 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440105816


##
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##
@@ -166,7 +169,10 @@ public class Task
 /** ID which identifies the slot in which the task is supposed to run. */
 private final AllocationID allocationId;
 
-/** TaskInfo object for this task. */
+/** The meta information of current job. */
+private final JobInfo jobInfo;

Review Comment:
   These code changes are prerequisite for the commits that unify the provision 
of metadata. I've moved these code changes to the commit `[FLINK-33905][core] 
Add getJobInfo() method to Environment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33905][core] Unify the Provision of Diverse Metadata for Context-like APIs [flink]

2024-01-03 Thread via GitHub


WencongLiu commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1440107273


##
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java:
##
@@ -119,7 +120,14 @@ public interface Environment {
 Configuration getJobConfiguration();
 
 /**
- * Returns the {@link TaskInfo} object associated with this subtask
+ * Returns the {@link JobInfo} object associated with current job.
+ *
+ * @return JobInfo for current job
+ */
+JobInfo getJobInfo();

Review Comment:
   These code changes are prerequisite for the commits that unify the provision 
of metadata. I've moved these code changes to the commit `[FLINK-33905][core] 
Add getJobInfo() method to Environment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33971) Specifies whether to use HBase table that supports dynamic columns.

2024-01-03 Thread MOBIN (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MOBIN updated FLINK-33971:
--
Description: 
Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: 
[[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]]

Sample code for a result table that supports dynamic columns
{code:java}
CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='',
  'zookeeper.quorum'='',
  'dynamic.table'='true'
);
INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; {code}

If dynamic.table is set to true, HBase table that supports dynamic columns is 
used.
Two fields must be declared in the rows that correspond to each column family. 
The value of the first field indicates the dynamic column, and the value of the 
second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the HBase table. f1:10 is 100, and f2:2020-7-26 is 1.

  was:
Specifies whether to use HBase table that supports dynamic columns.

Refer to the dynamic.table parameter in this document: 
[[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]]

Sample code for a result table that supports dynamic columns

CREATE TEMPORARY TABLE datagen_source (
  id INT,
  f1hour STRING,
  f1deal BIGINT,
  f2day STRING,
  f2deal BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  f1 ROW<`hour` STRING, deal BIGINT>,
  f2 ROW<`day` STRING, deal BIGINT>
) WITH (
  'connector'='hbase-2.2',
  'table-name'='',
  'zookeeper.quorum'='',
  'dynamic.table'='true'
);

INSERT INTO hbase_sink
SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source;
If dynamic.table is set to true, HBase table that supports dynamic columns is 
used.
Two fields must be declared in the rows that correspond to each column family. 
The value of the first field indicates the dynamic column, and the value of the 
second field indicates the value of the dynamic column.

For example, the datagen_source table contains a row of data The row of data 
indicates that the ID of the commodity is 1, the transaction amount of the 
commodity between 10:00 and 11:00 is 100, and the transaction amount of the 
commodity on July 26, 2020 is 1. In this case, a row whose rowkey is 1 is 
inserted into the HBase table. f1:10 is 100, and f2:2020-7-26 is 1.


> Specifies whether to use HBase table that supports dynamic columns.
> ---
>
> Key: FLINK-33971
> URL: https://issues.apache.org/jira/browse/FLINK-33971
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / HBase
>Reporter: MOBIN
>Priority: Minor
>
> Specifies whether to use HBase table that supports dynamic columns.
> Refer to the dynamic.table parameter in this document: 
> [[https://www.alibabacloud.com/help/en/flink/developer-reference/apsaradb-for-hbase-connector#section-ltp-3fy-9qv|http://example.com]]
> Sample code for a result table that supports dynamic columns
> {code:java}
> CREATE TEMPORARY TABLE datagen_source (
>   id INT,
>   f1hour STRING,
>   f1deal BIGINT,
>   f2day STRING,
>   f2deal BIGINT
> ) WITH (
>   'connector'='datagen'
> );
> CREATE TEMPORARY TABLE hbase_sink (
>   rowkey INT,
>   f1 ROW<`hour` STRING, deal BIGINT>,
>   f2 ROW<`day` STRING, deal BIGINT>
> ) WITH (
>   'connector'='hbase-2.2',
>   'table-name'='',
>   'zookeeper.quorum'='',
>   'dynamic.table'='true'
> );
> INSERT INTO hbase_sink
> SELECT id, ROW(f1hour, f1deal), ROW(f2day, f2deal) FROM datagen_source; {code}
> If dynamic.table is set to true, HBase table that supports dynamic columns is 
> used.
> Two fields must be declared in the rows that correspond to each column 
> family. The value of the first field indicates the dynamic column, and the 
> value of the second field indicates the value of the dynamic column.
> For example, the datagen_source table contains a row of data The row of data 
> indicates that the ID of the commodity is 1, the transaction amount of the 
> commodity between 10:00 and 11:00 

Re: [PR] [FLINK-33966] Fix the getNumRecordsInPerSecond Utility Function [flink-kubernetes-operator]

2024-01-03 Thread via GitHub


1996fanrui commented on code in PR #743:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/743#discussion_r1441197747


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -216,15 +216,23 @@ private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+// Generate numRecordsInPerSecond from 3 metrics:
+// 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task 
metric.
 var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+// 2. If the former is unavailable and the vertex contains a source 
operator, use the
+// corresponding source operator metric.
 if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
 numRecordsInPerSecond =
 
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
 }
+// 3. If the vertex contains a source operator which does not emit 
input metrics, use output
+// metrics instead.
+// then use SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC metric

Review Comment:
   ```suggestion
   ```
   
   Is this leftover?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33941][table-planner] use field index instead of field name about window time column [flink]

2024-01-03 Thread via GitHub


LadyForest merged PR #23991:
URL: https://github.com/apache/flink/pull/23991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33941) Use field reference index to compute window aggregate time attribute column

2024-01-03 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17802374#comment-17802374
 ] 

Jane Chan commented on FLINK-33941:
---

Fixed in master: b957480112c00d9d777247fc48b602e9908652a2

> Use field reference index to compute window aggregate time attribute column
> ---
>
> Key: FLINK-33941
> URL: https://issues.apache.org/jira/browse/FLINK-33941
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available
>
> In some exec nodes like StreamExecGroupWindowAggregate and some rules like 
> BatchPhysicalWindowAggregateRule, planner uses "AggregateUtil#timeFieldIndex" 
> to access the actual time field index, instead of using the time field index 
> in LogicalWindow#timeAttribute directly. However, it would be more formal to 
> use the field index instead of the column field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33941) Use field reference index to compute window aggregate time attribute column

2024-01-03 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-33941:
--
Summary: Use field reference index to compute window aggregate time 
attribute column  (was: Use field index instead of field name about window time 
column)

> Use field reference index to compute window aggregate time attribute column
> ---
>
> Key: FLINK-33941
> URL: https://issues.apache.org/jira/browse/FLINK-33941
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available
>
> In some exec nodes like StreamExecGroupWindowAggregate and some rules like 
> BatchPhysicalWindowAggregateRule, planner uses "AggregateUtil#timeFieldIndex" 
> to access the actual time field index, instead of using the time field index 
> in LogicalWindow#timeAttribute directly. However, it would be more formal to 
> use the field index instead of the column field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33941) Use field reference index to compute window aggregate time attribute column

2024-01-03 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-33941.
---
Resolution: Fixed

> Use field reference index to compute window aggregate time attribute column
> ---
>
> Key: FLINK-33941
> URL: https://issues.apache.org/jira/browse/FLINK-33941
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available
>
> In some exec nodes like StreamExecGroupWindowAggregate and some rules like 
> BatchPhysicalWindowAggregateRule, planner uses "AggregateUtil#timeFieldIndex" 
> to access the actual time field index, instead of using the time field index 
> in LogicalWindow#timeAttribute directly. However, it would be more formal to 
> use the field index instead of the column field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33941) Use field reference index to compute window aggregate time attribute column

2024-01-03 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-33941.
-

> Use field reference index to compute window aggregate time attribute column
> ---
>
> Key: FLINK-33941
> URL: https://issues.apache.org/jira/browse/FLINK-33941
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Minor
>  Labels: pull-request-available
>
> In some exec nodes like StreamExecGroupWindowAggregate and some rules like 
> BatchPhysicalWindowAggregateRule, planner uses "AggregateUtil#timeFieldIndex" 
> to access the actual time field index, instead of using the time field index 
> in LogicalWindow#timeAttribute directly. However, it would be more formal to 
> use the field index instead of the column field.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33939][runtime-web] Remove the husky dependency to make sure the global hooks work as expected [flink]

2024-01-03 Thread via GitHub


xintongsong commented on code in PR #23992:
URL: https://github.com/apache/flink/pull/23992#discussion_r1441248487


##
flink-runtime-web/README.md:
##
@@ -102,6 +102,8 @@ re-compilation:
 
 ### CodeStyle & Lint
 
+> Tips: Before committing, make sure you've run npm run lint-staged and that 
no errors are reported.

Review Comment:
   I'll fix this during merging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33939][runtime-web] Remove the husky dependency to make sure the global hooks work as expected [flink]

2024-01-03 Thread via GitHub


xintongsong closed pull request #23992: [FLINK-33939][runtime-web] Remove the 
husky dependency to make sure the global hooks work as expected
URL: https://github.com/apache/flink/pull/23992


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33939) Make husky in runtime-web no longer affect git global hooks

2024-01-03 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-33939:
-
Priority: Major  (was: Minor)

> Make husky in runtime-web no longer affect git global hooks
> ---
>
> Key: FLINK-33939
> URL: https://issues.apache.org/jira/browse/FLINK-33939
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Jason TANG
>Assignee: Jason TANG
>Priority: Major
>  Labels: pull-request-available
>
> Since runtime-web relies on husky to ensure that front-end code changes are 
> detected before `git commit`, husky modifies the global git hooks 
> (core.hooksPath) so that core.hooksPath won't take effect if it's configured 
> globally, I thought it would be a good idea to make the front-end code 
> detection a optional command execution, which ensures that the globally 
> configured hooks are executed correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >