[GitHub] [flink] flinkbot commented on pull request #22634: [FLINK-32172][kafka] KafkaExampleUtils incorrect check of the minimum number of parameters

2023-05-23 Thread via GitHub


flinkbot commented on PR #22634:
URL: https://github.com/apache/flink/pull/22634#issuecomment-1560498226

   
   ## CI report:
   
   * 5254b522b39097420b445eab58cfa4ab6e934c9a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #22634: [FLINK-32172][kafka] KafkaExampleUtils incorrect check of the minimum number of parameters

2023-05-23 Thread via GitHub


reswqa opened a new pull request, #22634:
URL: https://github.com/apache/flink/pull/22634

   ## What is the purpose of the change
   
   *KafkaExampleUtils incorrect check of the minimum number of parameters.*
   
   
   ## Brief change log
   
 - *Change the minimum number of parameters from `5` to `4`, aligning with 
error message.*
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22615: [hotfix][doc] Fix typo in analyze.md

2023-05-23 Thread via GitHub


reswqa commented on PR #22615:
URL: https://github.com/apache/flink/pull/22615#issuecomment-1560493433

   > BTW, you should rename your commit titile to [hotfix][doc] Fix typo in 
analyze.md.
   
   Sorry, I may not have expressed myself clearly. What needs to be modified is 
not only the title of this pull request, but also the title of the commit 
message. Now it is: [fix: analyze.md SQL syntax 
error](https://github.com/apache/flink/pull/22615/commits/5e50d9b947ddb87cd8e7e700fce8c8af750ec2b9)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] jasonyuan-cn commented on a diff in pull request #22615: [hotfix][doc] Fix typo in analyze.md

2023-05-23 Thread via GitHub


jasonyuan-cn commented on code in PR #22615:
URL: https://github.com/apache/flink/pull/22615#discussion_r1203463401


##
docs/content/docs/dev/table/sql/analyze.md:
##
@@ -379,4 +379,4 @@ ANALYZE TABLE [catalog_name.][db_name.]table_name 
PARTITION(partcol1[=val1] [, p
 
 *NOTE:* For the fix length types (like `BOOLEAN`, `INTEGER`, `DOUBLE` etc.), 
we need not collect the `avgLen` and `maxLen` from the original records.
 
-{{< top >}}
\ No newline at end of file
+{{< top >}}

Review Comment:
   This line of code should not be changed, sorry for my carelessness. I will 
undo this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-23 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,43 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+Description.builder()
+.text(
+"If disabled, the config 
execution.buffer-timeout will not take effect and the flushing will be 
triggered only when the output "
++ "buffer is full thus 
maximizing throughput")
+.build());
+
+public static final ConfigOption BUFFER_TIMEOUT_INTERVAL =
+ConfigOptions.key("execution.buffer-timeout.interval")
+.durationType()
+.defaultValue(Duration.ofMillis(100))
+.withDescription(
+Description.builder()
+.text(
+"The maximum time frequency 
(milliseconds) for the flushing of the output buffers. By default "
++ "the output buffers 
flush frequently to provide low latency and to aid smooth developer "
++ "experience. Setting the 
parameter can result in three logical modes:")
+.list(
+text(
+"A positive value triggers 
flushing periodically by that interval"),
+text(
+FLUSH_AFTER_EVERY_RECORD
++ " triggers 
flushing after every record thus minimizing latency"),
+text(
+"If the config "
++ 
BUFFER_TIMEOUT_ENABLED.key()
++ " is false,"
++ " trigger 
flushing only when the output buffer is full thus maximizing "
++ "throughput"))
+.build());
+
+/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */
+@Deprecated
 public static final ConfigOption BUFFER_TIMEOUT =
 ConfigOptions.key("execution.buffer-timeout")

Review Comment:
   Don't need to create a new `ConfigOption`, you can take a look this 
[PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4).
   
   https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-23 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,43 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+Description.builder()
+.text(
+"If disabled, the config 
execution.buffer-timeout will not take effect and the flushing will be 
triggered only when the output "
++ "buffer is full thus 
maximizing throughput")
+.build());
+
+public static final ConfigOption BUFFER_TIMEOUT_INTERVAL =
+ConfigOptions.key("execution.buffer-timeout.interval")
+.durationType()
+.defaultValue(Duration.ofMillis(100))
+.withDescription(
+Description.builder()
+.text(
+"The maximum time frequency 
(milliseconds) for the flushing of the output buffers. By default "
++ "the output buffers 
flush frequently to provide low latency and to aid smooth developer "
++ "experience. Setting the 
parameter can result in three logical modes:")
+.list(
+text(
+"A positive value triggers 
flushing periodically by that interval"),
+text(
+FLUSH_AFTER_EVERY_RECORD
++ " triggers 
flushing after every record thus minimizing latency"),
+text(
+"If the config "
++ 
BUFFER_TIMEOUT_ENABLED.key()
++ " is false,"
++ " trigger 
flushing only when the output buffer is full thus maximizing "
++ "throughput"))
+.build());
+
+/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */
+@Deprecated
 public static final ConfigOption BUFFER_TIMEOUT =
 ConfigOptions.key("execution.buffer-timeout")

Review Comment:
   Don't need to create a `ConfigOption`, you can take a look this 
[PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4).
   
   https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #22599: [FLINK-31967][table-runner] Solving the Initial Value Problem of LagA…

2023-05-23 Thread via GitHub


fsk119 commented on code in PR #22599:
URL: https://github.com/apache/flink/pull/22599#discussion_r1203384646


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala:
##
@@ -1184,6 +1184,34 @@ abstract class AggregateITCaseBase(testName: String) 
extends BatchTestBase {
 checkResult("select count(*) from src", Seq(row(3)))
   }
 
+  @Test
+  def testLeadAggFunction(): Unit = {
+val data =
+  List(rowOf(2L, 15, "Hello"), rowOf(8L, 11, "Hello world"), rowOf(9L, 12, 
"Hello world!"))
+val dataId = TestValuesTableFactory.registerData(data)
+tEnv.executeSql(s"""
+   |CREATE TABLE src(
+   |  `id` BIGINT,
+   |  len INT NOT NULL,

Review Comment:
   nit: len -> `len`



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala:
##
@@ -1765,4 +1765,35 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
 val expected = List("3")
 assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
+
+  @Test
+  def testLagAggFunction(): Unit = {
+val data =
+  List(rowOf(2L, 15, "Hello"), rowOf(8L, 11, "Hello world"), rowOf(9L, 12, 
"Hello world!"))
+val dataId = TestValuesTableFactory.registerData(data)
+tEnv.executeSql(s"""
+   |CREATE TABLE src(
+   |  `id` BIGINT,
+   |  len INT,

Review Comment:
   ditto



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/LagAggFunction.java:
##
@@ -44,6 +44,15 @@ public LagAggFunction(LogicalType[] valueTypes) {
 Arrays.stream(valueTypes)
 .map(DataTypeUtils::toInternalDataType)
 .toArray(DataType[]::new);
+// When the initial value has not been initialized and is not allowed 
to be null or
+// default value allowed to be empty,output should be changed to allow 
nulls

Review Comment:
   The output value can only be not null if the default input arguments include 
a non-null default value.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22633: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state…

2023-05-23 Thread via GitHub


flinkbot commented on PR #22633:
URL: https://github.com/apache/flink/pull/22633#issuecomment-1560424440

   
   ## CI report:
   
   * c9cdbadf7b65278f3cfc1761cf29cf67f4158efc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 opened a new pull request, #22633: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state…

2023-05-23 Thread via GitHub


fsk119 opened a new pull request, #22633:
URL: https://github.com/apache/flink/pull/22633

   …ment
   
   
   
   ## What is the purpose of the change
   
   Fix sql-gateway don't validate whether statement is legal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725624#comment-17725624
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks [~roman] , I will prepare a pr.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32043) SqlClient session unrecoverable once one wrong setting occurred

2023-05-23 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-32043:
-

Assignee: Shengkai Fang

> SqlClient session unrecoverable once one wrong setting occurred
> ---
>
> Key: FLINK-32043
> URL: https://issues.apache.org/jira/browse/FLINK-32043
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: lincoln lee
>Assignee: Shengkai Fang
>Priority: Critical
>
> In sql client, it can not work normally once one wrong setting occurred
> {code:java}
> // wrong setting here
> Flink SQL> SET table.sql-dialect = flink;
> [INFO] Execute statement succeed.
> Flink SQL> select '' AS f1, a from t1;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> SET table.sql-dialect = default;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET table.sql-dialect;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Hexiaoqiao commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM

2023-05-23 Thread via GitHub


Hexiaoqiao commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1203358593


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingCheckpointUtils.SnapshotFileSystemInfo;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+private final String id;
+
+protected final Executor ioExecutor;
+
+// file system and directories
+protected FileSystem fs;
+protected Path checkpointDir;
+protected Path sharedStateDir;
+protected Path taskOwnedStateDir;
+
+protected int writeBufferSize;
+private boolean fileSystemInitiated = false;
+
+protected boolean syncAfterClosingLogicalFile;
+
+protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+private final Map managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+protected Path managedExclusiveStateDir;
+
+public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+this.id = id;
+this.ioExecutor = ioExecutor;
+}
+
+@Override
+public void initFileSystem(SnapshotFileSystemInfo fileSystemInfo) throws 
IOException {
+initFileSystem(
+fileSystemInfo.fs,
+fileSystemInfo.checkpointBaseDirectory,
+fileSystemInfo.sharedStateDirectory,
+fileSystemInfo.taskOwnedStateDirectory);
+}
+
+@Override
+public void addSubtask(SubtaskKey subtaskKey) {
+String managedDirName = subtaskKey.getManagedDirName();
+Path managedPath = new Path(sharedStateDir, managedDirName);
+if (!managedSharedStateDir.containsKey(subtaskKey)) {
+createManagedDirectory(managedPath);
+managedSharedStateDir.put(subtaskKey, managedPath);
+}
+}
+
+// 
+//  logical & physical file
+// 
+
+protected LogicalFile createLogicalFile(
+@Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+LogicalFileId fileID = LogicalFileId.generateRandomId();
+return new LogicalFile(fileID, physicalFile, subtaskKey);
+}
+
+@Nonnull
+protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+throws IOException {
+PhysicalFile result;
+Exception latestException = null;
+
+Path dirPath = getManagedDir(subtaskKey, scope);
+
+if (dirPath == null) {
+throw new IOException(
+"Could not get "
++ scope
++ " path for subtask "
++ subtaskKey
++ ", the directory may have not been created.");
+}
+
+for (int attempt = 0; attempt < 10; attempt++) {
+try {
+OutputStreamAndPath streamAndPath 

[jira] [Comment Edited] (FLINK-32172) KafkaExample can not run with args

2023-05-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725622#comment-17725622
 ] 

Weijie Guo edited comment on FLINK-32172 at 5/24/23 3:32 AM:
-

Thanks for the report, this should be an accidental mistake(the minimum 
parameter count should be 4 instead of 5). It will be fixed asap.


was (Author: weijie guo):
Thanks for the report, this should be an accidental typo (the minimum parameter 
count should be 4 instead of 5). It will be fixed asap.

> KafkaExample can not run with args
> --
>
> Key: FLINK-32172
> URL: https://issues.apache.org/jira/browse/FLINK-32172
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
> Environment: * win11
>  * Git
>  * Maven (we recommend version 3.8.6)
>  * Java 11
>Reporter: xulongfeng
>Assignee: Weijie Guo
>Priority: Not a Priority
> Attachments: args.png, kafkaexample.png
>
>
> i fork and clone flink-connector-kafka repo. after build and package, i run 
> org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed,
> comment say:
> Example usage: --input-topic test-input --output-topic test-output 
> --bootstrap.servers
> * localhost:9092 --group.id myconsumer
>  
> but console print: Missing parameters!  from KafkaExampleUtil where need 5 
> paramters but we have 4
>  
> thank you for your attention to this matter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32172) KafkaExample can not run with args

2023-05-23 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725622#comment-17725622
 ] 

Weijie Guo commented on FLINK-32172:


Thanks for the report, this should be an accidental typo (the minimum parameter 
count should be 4 instead of 5). It will be fixed asap.

> KafkaExample can not run with args
> --
>
> Key: FLINK-32172
> URL: https://issues.apache.org/jira/browse/FLINK-32172
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
> Environment: * win11
>  * Git
>  * Maven (we recommend version 3.8.6)
>  * Java 11
>Reporter: xulongfeng
>Assignee: Weijie Guo
>Priority: Not a Priority
> Attachments: args.png, kafkaexample.png
>
>
> i fork and clone flink-connector-kafka repo. after build and package, i run 
> org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed,
> comment say:
> Example usage: --input-topic test-input --output-topic test-output 
> --bootstrap.servers
> * localhost:9092 --group.id myconsumer
>  
> but console print: Missing parameters!  from KafkaExampleUtil where need 5 
> paramters but we have 4
>  
> thank you for your attention to this matter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32172) KafkaExample can not run with args

2023-05-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-32172:
--

Assignee: Weijie Guo

> KafkaExample can not run with args
> --
>
> Key: FLINK-32172
> URL: https://issues.apache.org/jira/browse/FLINK-32172
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0
> Environment: * win11
>  * Git
>  * Maven (we recommend version 3.8.6)
>  * Java 11
>Reporter: xulongfeng
>Assignee: Weijie Guo
>Priority: Not a Priority
> Attachments: args.png, kafkaexample.png
>
>
> i fork and clone flink-connector-kafka repo. after build and package, i run 
> org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed,
> comment say:
> Example usage: --input-topic test-input --output-topic test-output 
> --bootstrap.servers
> * localhost:9092 --group.id myconsumer
>  
> but console print: Missing parameters!  from KafkaExampleUtil where need 5 
> paramters but we have 4
>  
> thank you for your attention to this matter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22632: [FLINK-32161][test] Migrate and remove some legacy ExternalResource

2023-05-23 Thread via GitHub


flinkbot commented on PR #22632:
URL: https://github.com/apache/flink/pull/22632#issuecomment-1560404949

   
   ## CI report:
   
   * 3446f19686348fdd7af3898f81cc8e501c4d56ac UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #22632: [FLINK-32161][test] Migrate and remove some legacy ExternalResource

2023-05-23 Thread via GitHub


reswqa opened a new pull request, #22632:
URL: https://github.com/apache/flink/pull/22632

   ## What is the purpose of the change
   
   *Migrate and remove some legacy ExternalResource.*
   
   
   ## Brief change log
   
 - *Auto injects TestLoggerExtension for flink-tests.*
 - *Migrate and remove some legacy ExternalResource*
   
   ## Verifying this change
   
   
   *This change is already covered by existing tests.*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32173) Flink Job Metrics returns stale values in the first request after an update in the values

2023-05-23 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-32173:
-

 Summary: Flink Job Metrics returns stale values in the first 
request after an update in the values
 Key: FLINK-32173
 URL: https://issues.apache.org/jira/browse/FLINK-32173
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.17.0
Reporter: Prabhu Joseph


Flink Job Metrics returns stale values in the first request after an update in 
the values.

*Repro:*

1. Run a flink job with fixed strategy and with multiple attempts 
{code}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1


flink run -Dexecution.checkpointing.interval="10s" -d -c 
org.apache.flink.streaming.examples.wordcount.WordCount 
/usr/lib/flink/examples/streaming/WordCount.jar
{code}

2. Kill one of the TaskManager which will initiate job restart.

3. After job restarted, fetch any job metrics. The first time it returns stale 
(older) value 48.

{code}
[hadoop@ip-172-31-44-70 ~]$ curl 
http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts
  | jq .
[
  {
"id": "numRestarts",
"value": "48"
  }
]
{code}

4. On subsequent runs, it returns the correct value.
{code}
[hadoop@ip-172-31-44-70 ~]$ curl 
http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts
  | jq .
[
  {
"id": "numRestarts",
"value": "49"
  }
]
{code}

5. Repeat steps 2 to 5, which will show that the first request after an update 
to the metrics returns a previous value before the update. Only on the next 
request is the correct value returned.







--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xishuaidelin commented on a diff in pull request #22539: [FLINK-31956][table] Extend the CompiledPlan to read from/write to Fl…

2023-05-23 Thread via GitHub


xishuaidelin commented on code in PR #22539:
URL: https://github.com/apache/flink/pull/22539#discussion_r1203332236


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -731,9 +731,16 @@ public TableResultInternal executePlan(InternalPlan plan) {
 }
 
 private CompiledPlan compilePlanAndWrite(
-String filePath, boolean ifNotExists, Operation operation) {
-File file = Paths.get(filePath).toFile();
-if (file.exists()) {
+Path filePath, boolean ifNotExists, Operation operation) {
+FileSystem fs;
+boolean exists;
+try {
+fs = filePath.getFileSystem();
+exists = fs.exists(filePath);
+} catch (IOException e) {
+throw new RuntimeException(e);

Review Comment:
   Okey. I have modified this in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #22615: [hotfix][doc] Fix typo in analyze.md - SQL sample syntax error

2023-05-23 Thread via GitHub


reswqa commented on PR #22615:
URL: https://github.com/apache/flink/pull/22615#issuecomment-1560379814

   BTW, you should rename your commit titile to `[hotfix][doc] Fix typo in 
analyze.md`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22615: [hotfix][doc] Fix typo in analyze.md - SQL sample syntax error

2023-05-23 Thread via GitHub


reswqa commented on code in PR #22615:
URL: https://github.com/apache/flink/pull/22615#discussion_r1203329260


##
docs/content/docs/dev/table/sql/analyze.md:
##
@@ -81,8 +81,8 @@ tableEnv.executeSql(
 " `id` BIGINT NOT NULl," +
 " `product` VARCHAR(32)," +
 " `amount` INT," +
-" `sold_year` BIGINT", +
-" `sold_month` BIGINT", +
+" `sold_year` BIGINT," +
+" `sold_month` BIGINT," +

Review Comment:
   Please also check the Chinese document to see if there are any similar 
issues.



##
docs/content/docs/dev/table/sql/analyze.md:
##
@@ -379,4 +379,4 @@ ANALYZE TABLE [catalog_name.][db_name.]table_name 
PARTITION(partcol1[=val1] [, p
 
 *NOTE:* For the fix length types (like `BOOLEAN`, `INTEGER`, `DOUBLE` etc.), 
we need not collect the `avgLen` and `maxLen` from the original records.
 
-{{< top >}}
\ No newline at end of file
+{{< top >}}

Review Comment:
   What is the purpose of this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (FLINK-32115) json_value support cache

2023-05-23 Thread xiaogang zhou (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32115 ]


xiaogang zhou deleted comment on FLINK-32115:
---

was (Author: zhoujira86):
[~luoyuxia] Hi yuxia, can you please help review this?

> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> +underlined 
> text+[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
> I added a static LRU cache into SqlJsonUtils, and refactor the 
> jsonValueExpression1 like 
> {code:java}
> private static JsonValueContext jsonValueExpression1(String input) {
> JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
> if (parsedJsonContext != null) {
> return parsedJsonContext;
> }
> try {
> parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
> } catch (Exception e) {
> parsedJsonContext = JsonValueContext.withException(e);
> }
> EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
> return parsedJsonContext;
> } {code}
>  
> and benchmarked like:
> {code:java}
> public static void main(String[] args) {
> String input = 
> "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;
> Long start = System.currentTimeMillis();
> for (int i = 0; i < 100; i++) {
> Object dejsonize = jsonValueExpression1(input);
> }
> System.err.println(System.currentTimeMillis() - start);
> } {code}
>  
> time 2 benchmark takes is:
> ||case||milli second taken||
> |cache|33|
> |no cache|1591|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31894) ExceptionHistory and REST API failure label integration

2023-05-23 Thread Panagiotis Garefalakis (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Panagiotis Garefalakis updated FLINK-31894:
---
Component/s: Runtime / REST

> ExceptionHistory and REST API failure label integration
> ---
>
> Key: FLINK-31894
> URL: https://issues.apache.org/jira/browse/FLINK-31894
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: Panagiotis Garefalakis
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-05-23 Thread via GitHub


flinkbot commented on PR #22631:
URL: https://github.com/apache/flink/pull/22631#issuecomment-1560255418

   
   ## CI report:
   
   * 4f66c765cda0fbbc84d2e8ed3883ebcde73b9b22 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dependabot[bot] opened a new pull request, #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard

2023-05-23 Thread via GitHub


dependabot[bot] opened a new pull request, #22631:
URL: https://github.com/apache/flink/pull/22631

   Bumps [socket.io-parser](https://github.com/socketio/socket.io-parser) and 
[socket.io](https://github.com/socketio/socket.io). These dependencies needed 
to be updated together.
   Updates `socket.io-parser` from 4.0.5 to 4.2.3
   
   Release notes
   Sourced from https://github.com/socketio/socket.io-parser/releases;>socket.io-parser's 
releases.
   
   4.2.3
   :warning: This release contains an important security fix :warning:
   A malicious client could send a specially crafted HTTP request, 
triggering an uncaught exception and killing the Node.js process:
   TypeError: Cannot convert object to primitive value
  at Socket.emit (node:events:507:25)
  at .../node_modules/socket.io/lib/socket.js:531:14
   
   Please upgrade as soon as possible.
   Bug Fixes
   
   check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3;>https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3
   
   4.2.2
   Bug Fixes
   
   calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a;>22c42e3)
   do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece;>ae8dd88)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2;>https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2
   
   4.2.1
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/b5d0cb7dc56a0601a09b056beaeeb0e43b160050;>b5d0cb7)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1;>https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1
   
   4.2.0
   Features
   
   allow the usage of custom replacer and reviver (https://redirect.github.com/socketio/socket.io-parser/issues/112;>#112)
 (https://github.com/socketio/socket.io-parser/commit/b08bc1a93e8e3194b776c8a0bdedee1e29333680;>b08bc1a)
   
   Links
   
   Diff: https://github.com/socketio/socket.io-parser/compare/4.1.2...4.2.0;>https://github.com/socketio/socket.io-parser/compare/4.1.2...4.2.0
   
   
   
   ... (truncated)
   
   
   Changelog
   Sourced from https://github.com/socketio/socket.io-parser/blob/main/CHANGELOG.md;>socket.io-parser's
 changelog.
   
   https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3;>4.2.3
 (2023-05-22)
   Bug Fixes
   
   check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117)
   
   https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2;>4.2.2
 (2023-01-19)
   Bug Fixes
   
   calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a;>22c42e3)
   do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece;>ae8dd88)
   
   https://github.com/Automattic/socket.io-parser/compare/3.3.2...3.3.3;>3.3.3
 (2022-11-09)
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/Automattic/socket.io-parser/commit/fb21e422fc193b34347395a33e0f625bebc09983;>fb21e42)
   
   https://github.com/socketio/socket.io-parser/compare/3.4.1...3.4.2;>3.4.2
 (2022-11-09)
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/04d23cecafe1b859fb03e0cbf6ba3b74dff56d14;>04d23ce)
   
   https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1;>4.2.1
 (2022-06-27)
   Bug Fixes
   
   check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/b5d0cb7dc56a0601a09b056beaeeb0e43b160050;>b5d0cb7)
   
   
   
   
   Commits
   
   https://github.com/socketio/socket.io-parser/commit/b6c824f82421aa44dfd5ef395f5132866543de59;>b6c824f
 chore(release): 4.2.3
   https://github.com/socketio/socket.io-parser/commit/dcc70d9678ac896de08294d6e8d668be6a68680a;>dcc70d9
 refactor: export typescript declarations for the commonjs build
   https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117
 fix: check the format of the event name
   https://github.com/socketio/socket.io-parser/commit/0841bd562351c3d45a5288e2adf9707cc8a3131d;>0841bd5
 chore: bump ua-parser-js from 1.0.32 to 1.0.33 (https://redirect.github.com/socketio/socket.io-parser/issues/121;>#121)
   https://github.com/socketio/socket.io-parser/commit/28dd6685021353b26a4b022e25b453c627d0a7e8;>28dd668
 chore(release): 4.2.2
   

[jira] [Created] (FLINK-32172) KafkaExample can not run with args

2023-05-23 Thread xulongfeng (Jira)
xulongfeng created FLINK-32172:
--

 Summary: KafkaExample can not run with args
 Key: FLINK-32172
 URL: https://issues.apache.org/jira/browse/FLINK-32172
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.0
 Environment: * win11
 * Git
 * Maven (we recommend version 3.8.6)
 * Java 11
Reporter: xulongfeng
 Attachments: args.png, kafkaexample.png

i fork and clone flink-connector-kafka repo. after build and package, i run 
org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed,

comment say:
Example usage: --input-topic test-input --output-topic test-output 
--bootstrap.servers
* localhost:9092 --group.id myconsumer
 
but console print: Missing parameters!  from KafkaExampleUtil where need 5 
paramters but we have 4
 
thank you for your attention to this matter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32171) Add PostStart hook to flink k8s operator helm

2023-05-23 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725588#comment-17725588
 ] 

Xingcan Cui commented on FLINK-32171:
-

Hi [~gyfora], would like to get your thoughts on this. I can work on it if you 
think this feature is reasonable. Thanks!

> Add PostStart hook to flink k8s operator helm
> -
>
> Key: FLINK-32171
> URL: https://issues.apache.org/jira/browse/FLINK-32171
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Xingcan Cui
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1
>
>
> I feel it will be convenient to add a PostStart hook optional config to flink 
> k8s operator helm (e.g. when users need to download some Flink plugins).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32171) Add PostStart hook to flink k8s operator helm

2023-05-23 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-32171:
---

 Summary: Add PostStart hook to flink k8s operator helm
 Key: FLINK-32171
 URL: https://issues.apache.org/jira/browse/FLINK-32171
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Xingcan Cui
 Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1


I feel it will be convenient to add a PostStart hook optional config to flink 
k8s operator helm (e.g. when users need to download some Flink plugins).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pgaref commented on pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling

2023-05-23 Thread via GitHub


pgaref commented on PR #22608:
URL: https://github.com/apache/flink/pull/22608#issuecomment-1560231799

   > Looks good overall, I'm just not a big fan of introducing a new impl. of 
the execution graph. Can we maybe do something along these lines instead? 
https://gist.github.com/dmvk/7481082e5584c9a1eff377282d6b93df
   
   Thanks @dmvk ! Was also looking for a smarter way to throw on ExecutionGraph 
init -- Using the same intermediate dataset id on initializeJobVertex is 
perfect! Updated :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-23 Thread via GitHub


pgaref commented on PR #22564:
URL: https://github.com/apache/flink/pull/22564#issuecomment-1560181396

   Thanks for the thorough review @dmvk ! PR is now updated :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pgaref commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-23 Thread via GitHub


pgaref commented on code in PR #22564:
URL: https://github.com/apache/flink/pull/22564#discussion_r1203063467


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1992,4 +2035,120 @@ public DummyState getState() {
 }
 }
 }
+
+private static class ExceptionHistoryTester {

Review Comment:
   Totally, I just missed that! Thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] kurtostfeld closed pull request #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…

2023-05-23 Thread via GitHub


kurtostfeld closed pull request #22586: [FLINK-31880][tests] Fixes unit test so 
it will pass regardless of ti…
URL: https://github.com/apache/flink/pull/22586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725569#comment-17725569
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Got it, thanks for clarifying. 

However, I think that using 
"Integer.toString(System.identityHashCode(stateHandle))" can easily lead to 
collisions, thereby causing checkpoint corruption.

But this seems to me an (important) implementation detail, that can be 
discussed in the PR. WDYT?

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-31095) FileSink doesn't work with s3a on EKS

2023-05-23 Thread Sylvia Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvia Lin closed FLINK-31095.
--
Resolution: Not A Bug

> FileSink doesn't work with s3a on EKS
> -
>
> Key: FLINK-31095
> URL: https://issues.apache.org/jira/browse/FLINK-31095
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: Sylvia Lin
>Priority: Major
>
> FileSink gives below exception on AWS EKS cluster:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException: This s3 file system 
> implementation does not support recoverable writers.
>   at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
>   at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]{code}
> [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16]
> And this may be related to 
> https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31095) FileSink doesn't work with s3a on EKS

2023-05-23 Thread Sylvia Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725539#comment-17725539
 ] 

Sylvia Lin commented on FLINK-31095:


Yeah, it's working! Thanks! I can close the ticket.

> FileSink doesn't work with s3a on EKS
> -
>
> Key: FLINK-31095
> URL: https://issues.apache.org/jira/browse/FLINK-31095
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.1
>Reporter: Sylvia Lin
>Priority: Major
>
> FileSink gives below exception on AWS EKS cluster:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException: This s3 file system 
> implementation does not support recoverable writers.
>   at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
>   at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
>  ~[flink-dist-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
>  ~[flink-connector-files-1.16.1.jar:1.16.1]{code}
> [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16]
> And this may be related to 
> https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #29: [FLINK-32021][Connectors/Kafka] Improvement the Javadoc for SpecifiedOffsetsInitializer and TimestampOffsetsInitial

2023-05-23 Thread via GitHub


RamanVerma commented on code in PR #29:
URL: 
https://github.com/apache/flink-connector-kafka/pull/29#discussion_r1202797216


##
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java:
##
@@ -38,6 +38,10 @@
  * An implementation of {@link OffsetsInitializer} which initializes the 
offsets of the partition
  * according to the user specified offsets.
  *
+ * Use specified offsets for specified partitions while use commit offsets 
or earliest for

Review Comment:
   thanks for the explanation @loserwang1024 
   As you said, you can change that comment to "use offsetResetStrategy which 
is default earliest”.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22630: [hotfix] Use https://repo.maven.apache.org in maven-utils.sh

2023-05-23 Thread via GitHub


flinkbot commented on PR #22630:
URL: https://github.com/apache/flink/pull/22630#issuecomment-1559869076

   
   ## CI report:
   
   * abba63af11465e74c8b5ffc08cbaa622e8666064 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin opened a new pull request, #22630: [hotfix] Use https://repo.maven.apache.org in maven-utils.sh

2023-05-23 Thread via GitHub


snuyanzin opened a new pull request, #22630:
URL: https://github.com/apache/flink/pull/22630

   
   ## What is the purpose of the change
   
   The idea is to use `https://repo.maven.apache.org` (same as 
`maven-wrapper.properties`) to download maven via `maven-utils.sh` since 
connection to `https://archive.apache.org` seems to be unstable sometimes like 
e.g. at https://issues.apache.org/jira/browse/FLINK-32106
   
   ## Brief change log
   
   `maven-utils.sh`
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no )
 - The S3 file system connector: ( no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (not applicable )
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32170) Continue metric collection on intermittant job restarts

2023-05-23 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-32170:
--

 Summary: Continue metric collection on intermittant job restarts
 Key: FLINK-32170
 URL: https://issues.apache.org/jira/browse/FLINK-32170
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels


If the underlying infrastructure is not stable, e.g. Kubernetes pod eviction, 
the jobs will sometimes restart. This will restart the metric collection 
process for the autoscaler and discard any existing metrics. If the 
interruption time is short, e.g. less than one minute, we could consider 
resuming metric collection after the job goes back into RUNNING state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lzshlzsh commented on a diff in pull request #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletio

2023-05-23 Thread via GitHub


lzshlzsh commented on code in PR #22612:
URL: https://github.com/apache/flink/pull/22612#discussion_r1202716866


##
flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseTimestampGeneratorTest.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link HBaseTimestampGenerator}. */
+public class HBaseTimestampGeneratorTest {
+@Test
+public void testStronglyIncreasingTimestampGenerator() {
+HBaseTimestampGenerator timestampGenerator = 
HBaseTimestampGenerator.stronglyIncreasing();
+long lastTimestamp = 0;
+for (int i = 0; i < 100_000_000; i++) {

Review Comment:
   > Please have a look at 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing
 - Having a test run 100 million times isn't an effective test
   
   @MartijnVisser 
   Thank you for your review and valuable feedback. I think I may need to use 
Junit5 to write test cases, and change 100 million times tests to one times。Can 
you give more suggestions on how to write this test?
   
   Actually, I have another question. How to write test cases for the 
probability of errors occurring in this data loss situation, as we cannot 
assert a definite state ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lzshlzsh commented on a diff in pull request #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletio

2023-05-23 Thread via GitHub


lzshlzsh commented on code in PR #22612:
URL: https://github.com/apache/flink/pull/22612#discussion_r1202716866


##
flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseTimestampGeneratorTest.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link HBaseTimestampGenerator}. */
+public class HBaseTimestampGeneratorTest {
+@Test
+public void testStronglyIncreasingTimestampGenerator() {
+HBaseTimestampGenerator timestampGenerator = 
HBaseTimestampGenerator.stronglyIncreasing();
+long lastTimestamp = 0;
+for (int i = 0; i < 100_000_000; i++) {

Review Comment:
   > Please have a look at 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing
 - Having a test run 100 million times isn't an effective test
   
   Thank you for your review and valuable feedback. I think I may need to use 
Junit5 to write test cases, and change 100 million times tests to one times。Can 
you give more suggestions on how to write this test?
   
   Actually, I have another question. How to write test cases for the 
probability of errors occurring in this data loss situation, as we cannot 
assert a definite state ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31684) Autoscaler metrics are only visible after metric window is full

2023-05-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31684:
---
Labels: pull-request-available  (was: )

> Autoscaler metrics are only visible after metric window is full
> ---
>
> Key: FLINK-31684
> URL: https://issues.apache.org/jira/browse/FLINK-31684
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> The metrics get reported only after the metric window is full. This is not 
> helpful for observability after rescaling. We need to make sure that metrics 
> are reported even when the metric window is not yet full.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #606: [FLINK-31684] Report autoscaling metrics before metric window is full

2023-05-23 Thread via GitHub


mxm opened a new pull request, #606:
URL: https://github.com/apache/flink-kubernetes-operator/pull/606

   The metrics get reported only after the metric window is full. This is not 
helpful for observability after rescaling. We need to make sure that metrics 
are reported even when the metric window is not yet full.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

2023-05-23 Thread via GitHub


XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1202637964


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code TestingLeaderElection} implements simple leader election for test 
cases where no {@code
+ * LeaderElectionService} is required.
+ */
+public class TestingLeaderElection implements LeaderElection {
+
+/**
+ * Is {@code null} if the {@code LeaderElection} isn't started.
+ *
+ * @see LeaderElection#startLeaderElection(LeaderContender)
+ */
+@Nullable private LeaderContender contender = null;
+
+@Nullable private CompletableFuture confirmationFuture 
= null;

Review Comment:
   hm, that made me start thinking. I guess, you're right: The 
`confirmationFuture` is bound to the `LeaderContender`. Resetting the contender 
should, indeed, also cause the `confirmationFuture` to be cancelled.
   
   But I did another round of digging: The `triggerContenderCleanup` was 
actually only exposed because of 
[DispatcherCleanupITCase:309](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java#L309)
 where it should have been used instead of calling stop (which felt unnatural 
in this case because we didn't actually wanted to stop the leader election but 
just wanted to reset the instance).
   
   The test itself seems to be odd: We actually don't need to reset the leader 
election because the following code would just start a cleanup process which 
doesn't rely on leader election anymore. This change happened in FLINK-25432 
(in https://github.com/apache/flink/commit/cc5d321d). The test wasn't properly 
cleaned up/refactored to reflect the new behavior. Therefore, we could just 
remove the leader election reset. As a consequence, there wouldn't be a need to 
expose the `triggerContenderCleanup`. 
   
   I'm going to provide a hotfix commit to clean the test up and revert the 
`triggerContenderCleanup` method exposure. The test code shouldn't be able to 
reset the leader election because it might make it possible to workaround badly 
structured code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725470#comment-17725470
 ] 

Feifan Wang edited comment on FLINK-29913 at 5/23/23 3:45 PM:
--

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}
 

And we can only use normal handles (not PlaceholderStreamStateHandle) in 
IncrementalRemoteKeyedStateHandle to make sure 
IncrementalRemoteKeyedStateHandle#generateRegisterKey() method never get a 
PlaceholderStreamStateHandle.


was (Author: feifan wang):
Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: 

[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725470#comment-17725470
 ] 

Feifan Wang commented on FLINK-29913:
-

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
for (Map.Entry sharedStateHandle :
sharedState.entrySet()) {

SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(), 
checkpointID);

sharedStateHandle.setValue(reference);
}
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725466#comment-17725466
 ] 

Gyula Fora commented on FLINK-32012:


I think you are on the right track. But we should ensure that in the 
lastReconciledSpec we record the upgradeMode that was used during the last 
deployment / rollback otherwise subsequent upgrades can be problematic.

If we used a savepoint we should record savepoint if HA metadata was used it 
should be last-state.

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked to a bug with zookeeper.
>  
> From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be 
> expected that the HA data has been deleted (as it is also performed by flink 
> when relying on savepoint upgrade mode).
> Still the use case seems to differ from 
> https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of 
> the failure and treat a specific rollback event.
> So I'm wondering why we enforce such a check when performing rollback if we 
> rely on savepoint upgrade mode. Would it be fine to not rely on the HA data 
> and rollback from the last savepoint (the one we used in the deployment step)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725453#comment-17725453
 ] 

Gyula Fora commented on FLINK-32012:


Yes you are right. But this is already handled in the getAvailableUpgradeModes 
logic during a normal upgrade. If your job failed to start after a savepoint 
upgrade you can send in a new spec and it will be upgraded using the previous 
savepoint.

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked to a bug with zookeeper.
>  
> From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be 
> expected that the HA data has been deleted (as it is also performed by flink 
> when relying on savepoint upgrade mode).
> Still the use case seems to differ from 
> https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of 
> the failure and treat a specific rollback event.
> So I'm wondering why we enforce such a check when performing rollback if we 
> rely on savepoint upgrade mode. Would it be fine to not rely on the HA data 
> and rollback from the last savepoint (the one we used in the deployment step)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Nicolas Fraison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725450#comment-17725450
 ] 

Nicolas Fraison edited comment on FLINK-32012 at 5/23/23 3:05 PM:
--

But the whole point of this request was the fact that when JobManager failed to 
start the HA metadata was not available during RollBack while the savepoint 
taken for the upgrade is available.

So relying on SAVEPOINT for rollback will ensure that Flink is aware of the 
availability of a savepoint.

>From my understanding of 
>[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198]
>  Flink will rely on the saveoint if no HA metadata exist otherwise it will 
>load the checkpoint

 

Forgot to mention it but indeed when calling 
[cancelJob|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L353%5D]
 it fallback to LAST_STATE in order to ensure no savepoint is created and HA 
metadata is not deleted

 


was (Author: JIRAUSER299678):
But the whole point of this request was the fact that when JobManager failed to 
start the HA metadata was not available during RollBack while the savepoint 
taken for the upgrade is available.

So relying on SAVEPOINT for rollback will ensure that Flink is aware of the 
availability of a savepoint.

>From my understanding of 
>[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198]
>  Flink will rely on the saveoint if no HA metadata exist otherwise it will 
>load the checkpoint

 

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> 

[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Nicolas Fraison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725450#comment-17725450
 ] 

Nicolas Fraison commented on FLINK-32012:
-

But the whole point of this request was the fact that when JobManager failed to 
start the HA metadata was not available during RollBack while the savepoint 
taken for the upgrade is available.

So relying on SAVEPOINT for rollback will ensure that Flink is aware of the 
availability of a savepoint.

>From my understanding of 
>[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198]
>  Flink will rely on the saveoint if no HA metadata exist otherwise it will 
>load the checkpoint

 

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked to a bug with zookeeper.
>  
> From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be 
> expected that the HA data has been deleted (as it is also performed by flink 
> when relying on savepoint upgrade mode).
> Still the use case seems to differ from 
> https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of 
> the failure and treat a specific rollback event.
> So I'm wondering why we enforce such a check when performing rollback if we 
> rely on savepoint upgrade mode. Would it be fine to not rely on the HA data 
> and rollback from the last savepoint (the one we used in the deployment step)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32162) Misleading log message due to missing null check

2023-05-23 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-32162:
-
Priority: Minor  (was: Major)

> Misleading log message due to missing null check
> 
>
> Key: FLINK-32162
> URL: https://issues.apache.org/jira/browse/FLINK-32162
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Updating the job requirements always logs "Failed to update requirements for 
> job {}." because we don't check whether the error is not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32162) Misleading log message due to missing null check

2023-05-23 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-32162.

Resolution: Fixed

master: fadde2a378aac4293676944dd513291919a481e3

> Misleading log message due to missing null check
> 
>
> Key: FLINK-32162
> URL: https://issues.apache.org/jira/browse/FLINK-32162
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Updating the job requirements always logs "Failed to update requirements for 
> job {}." because we don't check whether the error is not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol merged pull request #22628: [FLINK-32162] Only log error if an error occurred

2023-05-23 Thread via GitHub


zentol merged PR #22628:
URL: https://github.com/apache/flink/pull/22628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #22628: [FLINK-32162] Only log error if an error occurred

2023-05-23 Thread via GitHub


zentol commented on code in PR #22628:
URL: https://github.com/apache/flink/pull/22628#discussion_r1202484203


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1182,7 +1182,10 @@ public CompletableFuture 
updateJobResourceRequirements(
 getMainThreadExecutor())
 .whenComplete(
 (ack, error) -> {
-log.debug("Failed to update requirements for job 
{}.", jobId, error);
+if (error != null) {
+log.debug(

Review Comment:
   This will already be logged on error by the rest handler, so I think debug 
is fine here because its only relevant for devs, not users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 closed pull request #22627: [hotfix][doc] Use the flink-connector-jdbc v3.1 branch docs

2023-05-23 Thread via GitHub


wanglijie95 closed pull request #22627: [hotfix][doc] Use the 
flink-connector-jdbc v3.1 branch docs
URL: https://github.com/apache/flink/pull/22627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32169) Show allocated slots on TM page

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32169:


 Summary: Show allocated slots on TM page
 Key: FLINK-32169
 URL: https://issues.apache.org/jira/browse/FLINK-32169
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Show the allocated slogs on the TM page, so that you can better understand 
which job is consuming what resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dmvk commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling

2023-05-23 Thread via GitHub


dmvk commented on code in PR #22608:
URL: https://github.com/apache/flink/pull/22608#discussion_r1202427606


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java:
##
@@ -91,6 +96,28 @@ void setUp() {
 taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
 }
 
+@Test
+void testExceptionHistoryWithGlobalFailureLabels() throws Exception {

Review Comment:
   for the sake of completeness, it would be nice to explicitly configure the 
restart strategy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling

2023-05-23 Thread via GitHub


dmvk commented on code in PR #22608:
URL: https://github.com/apache/flink/pull/22608#discussion_r1202413078


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java:
##
@@ -91,6 +96,28 @@ void setUp() {
 taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
 }
 
+@Test
+void testExceptionHistoryWithGlobalFailureLabels() throws Exception {

Review Comment:
   Maybe this should state something along the lines of "failure during vertex 
initialization"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #50: [hotfix] Backport some docs-related fixes to v3.1

2023-05-23 Thread via GitHub


boring-cyborg[bot] commented on PR #50:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/50#issuecomment-1559480963

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725423#comment-17725423
 ] 

Gyula Fora commented on FLINK-32012:


[~nfraison.datadog] , I don't think it's possible to use "SAVEPOINT" upgrade 
mode when rolling back. That would imply that we can take a savepoint which 
means that the job would be running.

Also regarding your comment about what to set in the status for upgradeMode, we 
have to set what we actually ended up using, there is some logic that relies on 
this. The first time a job is started from an empty state we record stateless, 
if you started with initialSavepointPath then it would be savepoint. All these 
are key pieces of logic that allows us to upgrade safely without accidentally 
losing state.

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked to a bug with zookeeper.
>  
> From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be 
> expected that the HA data has been deleted (as it is also performed by flink 
> when relying on savepoint upgrade mode).
> Still the use case seems to differ from 
> https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of 
> the failure and treat a specific rollback event.
> So I'm wondering why we enforce such a check when performing rollback if we 
> rely on savepoint upgrade mode. Would it be fine to not rely on the HA data 
> and rollback from the last savepoint (the one we used in the deployment step)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] Thesharing commented on pull request #652: [FLINK-32051] Fix broken documentation links in Flink blogs

2023-05-23 Thread via GitHub


Thesharing commented on PR #652:
URL: https://github.com/apache/flink-web/pull/652#issuecomment-1559436745

   > +1, I met this same issue at flink blog. But why insert some blanks in 
some one doc?
   
   Thanks for the reply. It's because without these blanks, the links in the 
corresponding paragraph will be broken. You could find the broken links in this 
[blog](https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/#reducing-complexity-with-groups).
   
   https://github.com/apache/flink-web/assets/6576831/a86e98b1-7aa0-44e6-a2bf-1b41ab7c57fe;>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan reassigned FLINK-29913:
-

Assignee: Feifan Wang

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29913:
--
Fix Version/s: 1.16.2
   1.17.2

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29913:
--
Affects Version/s: 1.17.0

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0, 1.17.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29913:
--
Priority: Major  (was: Minor)

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Assignee: Feifan Wang
>Priority: Major
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725418#comment-17725418
 ] 

Roman Khachatryan commented on FLINK-29913:
---

{quote}On a 64-bit system, about 7.63MB more memory will be used for every one 
million entries
Is there any other runtime overhead I missed ?
{quote}
I was more concerned about the additional time required to traverse the (mostly 
single-element) lists. When a checkpoint is subsumed, *all* entries need to be 
scanned. Adding pointer dereference(s) might break any optimizations that JVM 
and CPU would otherwise employ.

 
{quote}As for the complexity, this approach will indeed increase the operation 
of the linked list in the registerReference() method and 
unregisterUnusedState() method.
But given that this is easy to implement, and the implementation is cohesive, I 
think the complexity is acceptable.
{quote}
In my view, simplicity in this is part is worth the efforts. The problem that 
SharedStateRegistry solves is already tricky, and we shouldn't complicate it 
further (higher complexity potentially leads to more bugs and more maintanance 
efforts). 

 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I don't fully understand what does "never changed" means here.
[Here|https://github.com/apache/flink/blob/fbf7b91424ec626ae56dd2477347a7759db6d5fe/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105],
 the ID is used to create local path. If we change ID to remote path, local 
will change too. Per my understanding, we can not change the local path without 
updating metadata files.
Or am I missing something?

 
{quote}Whichever approach will be chosen, I am happy to implement it. Can you 
assign this ticket to me Roman Khachatryan ? looking forward to hearing from 
you.
{quote}
Sure, thanks for volounteering!

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32168) Log required/available resources in RM

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32168:


 Summary: Log required/available resources in RM
 Key: FLINK-32168
 URL: https://issues.apache.org/jira/browse/FLINK-32168
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


When matching requirements against available resource the RM currently doesn't 
log anything apart from whether it could fulfill the resources or not.

We can make the system easier to audit by logging the current requirements, 
available resources, and how many resources are left after the matching.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32167) Log dynamic slot creation on task manager

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32167:


 Summary: Log dynamic slot creation on task manager
 Key: FLINK-32167
 URL: https://issues.apache.org/jira/browse/FLINK-32167
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


When a slot is dynamically allocated on the TM we should log that this happens, 
what resources it consumes and what the remaining resources are.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32166) Show unassigned/total TM resources in web ui

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32166:


 Summary: Show unassigned/total TM resources in web ui
 Key: FLINK-32166
 URL: https://issues.apache.org/jira/browse/FLINK-32166
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


It is important to know how many resources of a TM are currently _assigned_ to 
jobs.
This is different to what resources currently _used_, since you can have 
assigned 1gb memory to a job with it only using 10mb at this time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32165) Improve observability ofd fine-grained resource management

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32165:


 Summary: Improve observability ofd fine-grained resource management
 Key: FLINK-32165
 URL: https://issues.apache.org/jira/browse/FLINK-32165
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Right now fine-grained resource management is way too much of a black-box, with 
the only source of information being the taskmanager rest endpoints.

While this is fine-ish for services built around it the developer experience is 
suffering greatly and it becomes impossible to reason about the system 
afterwards (because we don't even log anything).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32165) Improve observability of fine-grained resource management

2023-05-23 Thread Chesnay Schepler (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-32165:
-
Summary: Improve observability of fine-grained resource management  (was: 
Improve observability ofd fine-grained resource management)

> Improve observability of fine-grained resource management
> -
>
> Key: FLINK-32165
> URL: https://issues.apache.org/jira/browse/FLINK-32165
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination, Runtime / Web Frontend
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.18.0
>
>
> Right now fine-grained resource management is way too much of a black-box, 
> with the only source of information being the taskmanager rest endpoints.
> While this is fine-ish for services built around it the developer experience 
> is suffering greatly and it becomes impossible to reason about the system 
> afterwards (because we don't even log anything).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata

2023-05-23 Thread Nicolas Fraison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725413#comment-17725413
 ] 

Nicolas Fraison commented on FLINK-32012:
-

[~gyfora], I started an implementation which do not modify the upgradeMode to 
last-state as the 
[restoreJob|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L143%5D]
 for LAST_STATE upgrade mode will enforce the requirement for HA metadata which 
is not what we want when relying on SAVEPOINT.

Also when restoring last stable spec there is a case where the UpgradeMode is 
set to STATELESS in this spec even if the chosen mode is SAVEPOINT 
([updateStatusBeforeFirstDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L193])

In order to avoid restoring this bad state leading to rollback not taking in 
account the savepoint I enforce the upgrade mode of the restored spec to be the 
one currently set on the job.

But I'm wondering why we have decided to not persist the really used upgrade 
mode in the last stable spec for first deployment?

Here is the diff of my 
[WIP|https://github.com/apache/flink-kubernetes-operator/compare/main...ashangit:flink-kubernetes-operator:nfraison/FLINK-32012?expand=1]
 if approach is not clear (not to review...)

> Operator failed to rollback due to missing HA metadata
> --
>
> Key: FLINK-32012
> URL: https://issues.apache.org/jira/browse/FLINK-32012
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> The operator has well detected that the job was failing and initiate the 
> rollback but this rollback has failed due to `Rollback is not possible due to 
> missing HA metadata`
> We are relying on saevpoint upgrade mode and zookeeper HA.
> The operator is performing a set of action to also delete this HA data in 
> savepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346]
>  : Suspend job with savepoint and deleteClusterDeployment
>  * [flink-kubernetes-operator/StandaloneFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158]
>  : Remove JM + TM deployment and delete HA data
>  * [flink-kubernetes-operator/AbstractFlinkService.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008]
>  : Wait cluster shutdown and delete zookeeper HA data
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155]
>  : Remove all child znode
> Then when running rollback the operator is looking for HA data even if we 
> rely on sevepoint upgrade mode:
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164]
>  Perform reconcile of rollback if it should rollback
>  * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387]
>  Rollback failed as HA data is not available
>  * [flink-kubernetes-operator/FlinkUtils.java at main · 
> apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220]
>  Check if some child znodes are available
> For both step the pattern looks to be the same for kubernetes HA so it 
> doesn't looks to be linked 

[jira] [Comment Edited] (FLINK-32163) Support the same application run multiple jobs in HA mode

2023-05-23 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725412#comment-17725412
 ] 

Martijn Visser edited comment on FLINK-32163 at 5/23/23 1:33 PM:
-

[~melin] The ticket is unclear for me. Looking at 
https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus
 I think this first needs to be resolved on the Dev mailing list, before 
opening a Jira ticket.


was (Author: martijnvisser):
[~melin] The ticket is unclear for me. Looking at 
https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus
 I think this first needs to be resolved on a mailing list, before opening a 
Jira ticket.

> Support the same application run multiple jobs in HA mode
> -
>
> Key: FLINK-32163
> URL: https://issues.apache.org/jira/browse/FLINK-32163
> Project: Flink
>  Issue Type: New Feature
>Reporter: melin
>Priority: Major
>
> Support the same application run multiple jobs in HA mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32163) Support the same application run multiple jobs in HA mode

2023-05-23 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725412#comment-17725412
 ] 

Martijn Visser commented on FLINK-32163:


[~melin] The ticket is unclear for me. Looking at 
https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus
 I think this first needs to be resolved on a mailing list, before opening a 
Jira ticket.

> Support the same application run multiple jobs in HA mode
> -
>
> Key: FLINK-32163
> URL: https://issues.apache.org/jira/browse/FLINK-32163
> Project: Flink
>  Issue Type: New Feature
>Reporter: melin
>Priority: Major
>
> Support the same application run multiple jobs in HA mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on pull request #22618: [FLINK-32153][build] Limit powermock to core/runtime

2023-05-23 Thread via GitHub


zentol commented on PR #22618:
URL: https://github.com/apache/flink/pull/22618#issuecomment-1559362927

   yeah I've noticed it already; my guess is the mockito-core exclusion isn't 
working as it should. Will fix that later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace

2023-05-23 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-32164:
---
Issue Type: Bug  (was: Improvement)

> LifecycleState count metrics are not reported correctly by namespace
> 
>
> Key: FLINK-32164
> URL: https://issues.apache.org/jira/browse/FLINK-32164
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.4.0, kubernetes-operator-1.5.0
>Reporter: Gyula Fora
>Priority: Major
>
> The per namespace lifecycle state count metrics are incorrectly show a global 
> count:
> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace

2023-05-23 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-32164:
--

 Summary: LifecycleState count metrics are not reported correctly 
by namespace
 Key: FLINK-32164
 URL: https://issues.apache.org/jira/browse/FLINK-32164
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.4.0
Reporter: Gyula Fora


The per namespace lifecycle state count metrics are incorrectly show a global 
count:

https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dmvk commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-23 Thread via GitHub


dmvk commented on code in PR #22564:
URL: https://github.com/apache/flink/pull/22564#discussion_r1202308510


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1992,4 +2035,120 @@ public DummyState getState() {
 }
 }
 }
+
+private static class ExceptionHistoryTester {

Review Comment:
   Would it be possible to use this as a replacement for 
`org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest#runExceptionHistoryTests(java.util.function.BiConsumer>,
 
java.util.function.Consumer,
 java.util.function.Consumer)` as 
well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling

2023-05-23 Thread via GitHub


dmvk commented on code in PR #22564:
URL: https://github.com/apache/flink/pull/22564#discussion_r1202299569


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java:
##
@@ -132,6 +132,22 @@ private ExceptionHistoryEntry(
 this.failureLabels = 
Collections.unmodifiableMap(labelMap));
 }
 
+/**
+ * Creates a new ExceptionHistoryEntry copy of the original but with the 
provided labels
+ * (currently used for Failures).
+ *
+ * @param failureLabels to be passed to the new ExceptionHistoryEntry
+ * @return new ExceptionHistoryEntry with the associated labels
+ */
+public ExceptionHistoryEntry withLabels(CompletableFuture> failureLabels) {

Review Comment:
   is this unused?



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1992,4 +2035,120 @@ public DummyState getState() {
 }
 }
 }
+
+private static class ExceptionHistoryTester {

Review Comment:
   Would it be possible to use this for as a replacement for 
`org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest#runExceptionHistoryTests(java.util.function.BiConsumer>,
 
java.util.function.Consumer,
 java.util.function.Consumer)` as 
well?



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##
@@ -1295,23 +1329,25 @@ void testExceptionHistoryWithTaskFailure() throws 
Exception {
 @Test
 void testExceptionHistoryWithTaskFailureWithRestart() throws Exception {
 final Exception expectedException = new Exception("Expected Local 
Exception");
-Consumer setupScheduler =
+final Consumer setupScheduler =
 builder ->
 builder.setRestartBackoffTimeStrategy(
 new FixedDelayRestartBackoffTimeStrategy
 
.FixedDelayRestartBackoffTimeStrategyFactory(1, 100)
 .create());
-BiConsumer> testLogic =
+final BiConsumer> 
testLogic =
 (scheduler, attemptIds) -> {
 final ExecutionAttemptID attemptId = attemptIds.get(1);
 scheduler.updateTaskExecutionState(
 new TaskExecutionStateTransition(
 new TaskExecutionState(
 attemptId, ExecutionState.FAILED, 
expectedException)));
 };
-
 final Iterable actualExceptionHistory =
-runExceptionHistoryTests(testLogic, setupScheduler);

Review Comment:
   this method is no longer used 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32163) Support the same application run multiple jobs in HA mode

2023-05-23 Thread melin (Jira)
melin created FLINK-32163:
-

 Summary: Support the same application run multiple jobs in HA mode
 Key: FLINK-32163
 URL: https://issues.apache.org/jira/browse/FLINK-32163
 Project: Flink
  Issue Type: New Feature
Reporter: melin


Support the same application run multiple jobs in HA mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-jdbc] eskabetxe commented on pull request #22: [FLINK-30790] Add DatabaseExtension with TableManaged for testing

2023-05-23 Thread via GitHub


eskabetxe commented on PR #22:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/22#issuecomment-1559224399

   @MartijnVisser any news for merging this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725387#comment-17725387
 ] 

Feifan Wang commented on FLINK-29913:
-

One overhead I can see is that it will use more memory for storing the next 
pointer. On a 64-bit system, about 7.63MB more memory will be used for every 
one million entries, I think it is acceptable. Is there any other runtime 
overhead I missed ?

As for the complexity, this approach will indeed increase the operation of the 
linked list in the _registerReference()_ method and _unregisterUnusedState()_ 
method. But given that this is easy to implement, and the implementation is 
cohesive, I think the complexity is acceptable.

 

Just to clarify, I think using a unique ID is also a valid approach, but I want 
learn how you do the selection. Further, regarding the approach of using unique 
registry key, I agree with [~klion26] , we can just choose a stable register 
key generation method based on remote file name (such as use md5 digest of 
remote file name) , which can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . 
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.

 

Whichever approach will be chosen, I am happy to implement it. Can you assign 
this ticket to me  [~roman] ? looking forward to hearing from you.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function

2023-05-23 Thread via GitHub


flinkbot commented on PR #22629:
URL: https://github.com/apache/flink/pull/22629#issuecomment-1559168847

   
   ## CI report:
   
   * c9a4a54f5ac5b6dccdd762ac13955f8bea8dd577 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32056) Update the used Pulsar connector in flink-python to 4.0.0

2023-05-23 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-32056:
---
Priority: Critical  (was: Major)

> Update the used Pulsar connector in flink-python to 4.0.0
> -
>
> Key: FLINK-32056
> URL: https://issues.apache.org/jira/browse/FLINK-32056
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Pulsar
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> flink-python still references and tests flink-connector-pulsar:3.0.0, while 
> it should be using flink-connector-pulsar:4.0.0. That's because the newer 
> version is the only version compatible with Flink 1.17 and it doesn't rely on 
> flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] liuyongvs commented on pull request #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function

2023-05-23 Thread via GitHub


liuyongvs commented on PR #22629:
URL: https://github.com/apache/flink/pull/22629#issuecomment-1559161153

   hi @dawidwys do you have time to help review it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] liuyongvs opened a new pull request, #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function

2023-05-23 Thread via GitHub


liuyongvs opened a new pull request, #22629:
URL: https://github.com/apache/flink/pull/22629

   - What is the purpose of the change
   This is an implementation of ARRAY_INTERSECT
   
   - Brief change log
   ARRAY_INTERSECT for Table API and SQL
   
   ```
   Returns an array of the elements in the intersection of array1 and array2, 
without duplicates.
   
   Syntax:
   array_intersect(array1, array2)
   
   Arguments:
   array: An ARRAY to be handled.
   
   Returns:
   An ARRAY. If any of the array is null, the function will return null.
   Examples:
   
   > SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));
[1,3]
   
   ```
   
   
   See also
   spark https://spark.apache.org/docs/latest/api/sql/index.html#array_intersect
   presto https://prestodb.io/docs/current/functions/array.html
   
   - Verifying this change
   This change added tests in CollectionFunctionsITCase.
   
   - Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): ( no)
   The public API, i.e., is any changed class annotated with @Public(Evolving): 
(yes )
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): ( no)
   Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
   The S3 file system connector: ( no)
   - Documentation
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (docs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32056) Update the used Pulsar connector in flink-python to 4.0.0

2023-05-23 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-32056.
---
Fix Version/s: 1.18.0
   1.17.2
   Resolution: Fixed

Fixed in:
- master via fbf7b91424ec626ae56dd2477347a7759db6d5fe
- release-1.17 via d3a3755a7eef5708871580671169fd6bd2babf28

> Update the used Pulsar connector in flink-python to 4.0.0
> -
>
> Key: FLINK-32056
> URL: https://issues.apache.org/jira/browse/FLINK-32056
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Connectors / Pulsar
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> flink-python still references and tests flink-connector-pulsar:3.0.0, while 
> it should be using flink-connector-pulsar:4.0.0. That's because the newer 
> version is the only version compatible with Flink 1.17 and it doesn't rely on 
> flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22628: [FLINK-32162] Only log error if an error occurred

2023-05-23 Thread via GitHub


flinkbot commented on PR #22628:
URL: https://github.com/apache/flink/pull/22628#issuecomment-1559117082

   
   ## CI report:
   
   * 437fc7e4ecb949686e6ebd3ce50de7f57594d493 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dmvk commented on a diff in pull request #22628: [FLINK-32162] Only log error if an error occurred

2023-05-23 Thread via GitHub


dmvk commented on code in PR #22628:
URL: https://github.com/apache/flink/pull/22628#discussion_r1202135895


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1182,7 +1182,10 @@ public CompletableFuture 
updateJobResourceRequirements(
 getMainThreadExecutor())
 .whenComplete(
 (ack, error) -> {
-log.debug("Failed to update requirements for job 
{}.", jobId, error);
+if (error != null) {
+log.debug(

Review Comment:
    should the log level be different as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31639) Introduce tiered store memory manager

2023-05-23 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-31639.

Fix Version/s: 1.18.0
   Resolution: Done

master (1.18): c6d7747eaef166fb7577de55cb2943fa5408d54e

> Introduce tiered store memory manager
> -
>
> Key: FLINK-31639
> URL: https://issues.apache.org/jira/browse/FLINK-31639
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Affects Versions: 1.18.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong closed pull request #22352: [FLINK-31639][network] Introduce tiered store memory manager

2023-05-23 Thread via GitHub


xintongsong closed pull request #22352: [FLINK-31639][network] Introduce tiered 
store memory manager
URL: https://github.com/apache/flink/pull/22352


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32162) Misleading log message due to missing null check

2023-05-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32162:
---
Labels: pull-request-available  (was: )

> Misleading log message due to missing null check
> 
>
> Key: FLINK-32162
> URL: https://issues.apache.org/jira/browse/FLINK-32162
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Updating the job requirements always logs "Failed to update requirements for 
> job {}." because we don't check whether the error is not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32162) Misleading log message due to missing null check

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32162:


 Summary: Misleading log message due to missing null check
 Key: FLINK-32162
 URL: https://issues.apache.org/jira/browse/FLINK-32162
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Updating the job requirements always logs "Failed to update requirements for 
job {}." because we don't check whether the error is not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

2023-05-23 Thread via GitHub


XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1202068871


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of 
the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+@Override
+public LeaderElection createLeaderElection() {
+return new LeaderElectionImpl(this);
+}
+
+/**
+ * Registers the given {@link LeaderContender} with the underlying {@code
+ * LeaderElectionService}. Leadership changes are starting to be reported 
to the {@code
+ * LeaderContender}.
+ */
+protected abstract void register(LeaderContender contender) throws 
Exception;
+
+/** Confirms the leadership with the given session ID and address. */
+protected abstract void confirmLeadership(UUID leaderSessionID, String 
leaderAddress);
+
+/**
+ * Checks whether the {@code LeaderElectionService} has the leadership 
acquired for the given
+ * session ID.
+ *
+ * @return {@code true} if the service has leadership with the passed 
session ID acquired;
+ * {@code false} otherwise.
+ */
+protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+/** {@code LeaderElectionImpl} is the default implementation of {@link 
LeaderElection}. */
+private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   > and you ended up doing exactly what I asked for?...
   I'm confused by the question mark here. :innocent: I found your proposal 
reasonable in terms of testability and went ahead with the refactoring. My 
[previous 
comment](https://github.com/apache/flink/pull/22384#discussion_r1200710974) 
just explained my initial reasoning. :-D



##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of 
the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+@Override
+public LeaderElection createLeaderElection() {
+return new LeaderElectionImpl(this);
+}
+
+/**
+ * Registers the given {@link LeaderContender} with the underlying {@code
+ * LeaderElectionService}. Leadership changes are starting to be reported 
to the {@code
+ * LeaderContender}.
+ */
+protected abstract void register(LeaderContender contender) throws 
Exception;
+
+/** Confirms the leadership with the given session ID and address. */
+protected abstract void confirmLeadership(UUID leaderSessionID, String 
leaderAddress);
+
+/**
+ * Checks whether the {@code LeaderElectionService} has the leadership 
acquired for the given
+ 

[jira] [Created] (FLINK-32161) Migrate and remove some legacy ExternalResource

2023-05-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-32161:
--

 Summary: Migrate and remove some legacy ExternalResource
 Key: FLINK-32161
 URL: https://issues.apache.org/jira/browse/FLINK-32161
 Project: Flink
  Issue Type: Technical Debt
Reporter: Weijie Guo
Assignee: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32161) Migrate and remove some legacy ExternalResource

2023-05-23 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-32161:
---
Affects Version/s: 1.18.0

> Migrate and remove some legacy ExternalResource
> ---
>
> Key: FLINK-32161
> URL: https://issues.apache.org/jira/browse/FLINK-32161
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.18.0
>Reporter: Weijie Guo
>Assignee: Weijie Guo
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-23 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725336#comment-17725336
 ] 

Roman Khachatryan commented on FLINK-29913:
---

Thanks for the proposal [~Feifan Wang]. I think it's easier to implement, 
compared to always unique state IDs. OTH, it complicates already complex part 
of the system; and has some runtime overhead. So I'd rather go with unique 
state IDs.

WDYT?

 

As for the priority, I'd change it to Major if there are no objections.

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.15.0, 1.16.0
>Reporter: Yanfei Lei
>Priority: Minor
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException

2023-05-23 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725319#comment-17725319
 ] 

Martijn Visser commented on FLINK-32160:


[~lzljs3620320] Do you have any insights on this?

> CompactOperator cannot continue from checkpoint because of 
> java.util.NoSuchElementException
> ---
>
> Key: FLINK-32160
> URL: https://issues.apache.org/jira/browse/FLINK-32160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.16.0, 1.17.0
> Environment: Flink 1.16/1.17 on k8s (flink-kubernetes-operator 
> v.1.4.0), s3
>Reporter: Michał Fijołek
>Priority: Major
>
> Hello :) We have a flink job (v 1.17) on k8s (using official 
> flink-k8s-operator) that reads data from kafka and writes it to s3 using 
> flink-sql using compaction. Job sometimes fails and continues from checkpoint 
> just fine, but once a couple of days we experience a crash loop. Job cannot 
> continue from the latest checkpoint and fails with such exception:
> {noformat}
> java.util.NoSuchElementException at 
> java.base/java.util.ArrayList$Itr.next(Unknown Source)
>  at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
>  at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
>  at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
>  at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>  at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>  at java.base/java.lang.Thread.run(Unknown Source){noformat}
> Here’s the relevant code: 
> [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]
> It looks like `CompactOperator` is calling `next()` on iterator without 
> checking `hasNext()` first - why's that? Is it a bug? Why 
> `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty 
> iterator? Is latest checkpoint broken in such case? 
> We have an identical job, but without compaction, and it works smoothly for a 
> couple of weeks now. 
> The whole job is just `select` from kafka and `insert` to s3.
> {noformat}
> CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` (  `foo_bar1` STRING,
>   `foo_bar2` STRING,
>   `foo_bar3` STRING,
>   `foo_bar4` STRING
>   )
>   PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
>   STORED AS parquet
>   LOCATION 's3a://my/bucket/'
>   TBLPROPERTIES (
> 'auto-compaction' = 'true',
> 'compaction.file-size' = '128MB',
> 'sink.parallelism' = '8',
> 'format' = 'parquet',
> 'parquet.compression' = 'SNAPPY',
> 'sink.rolling-policy.rollover-interval' = '1 h',
> 'sink.partition-commit.policy.kind' = 'metastore'
>   ){noformat}
> Checkpoint configuration:
> {noformat}
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend HashMapStateBackend
> Interval 20m 0s
> Timeout 10m 0s
> Minimum Pause Between Checkpoints 0ms
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Disabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 0
> Checkpoints With Finished Tasks Enabled
> State Changelog Disabled{noformat}
> Is there something wrong with given config or is this some unhandled edge 
> case? 
> Currently our workaround is to restart a job, without using checkpoint - it 
> uses a state from kafka which in this case is fine



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure

2023-05-23 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725311#comment-17725311
 ] 

Aitozi commented on FLINK-31828:


Any sql guys can help verify this fix ?

> List field in a POJO data stream results in table program compilation failure
> -
>
> Key: FLINK-31828
> URL: https://issues.apache.org/jira/browse/FLINK-31828
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
> Environment: Java 11
> Flink 1.16.1
>Reporter: Vladimir Matveev
>Priority: Major
>  Labels: pull-request-available
> Attachments: MainPojo.java, generated-code.txt, stacktrace.txt
>
>
> Suppose I have a POJO class like this:
> {code:java}
> public class Example {
> private String key;
> private List> values;
> // getters, setters, equals+hashCode omitted
> }
> {code}
> When a DataStream with this class is converted to a table, and some 
> operations are performed on it, it results in an exception which explicitly 
> says that I should file a ticket:
> {noformat}
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
> {noformat}
> Please find the example Java code and the full stack trace attached.
> From the exception and generated code it seems that Flink is upset with the 
> list field being treated as an array - but I cannot have an array type there 
> in the real code.
> Also note that if I _don't_ specify the schema explicitly, it then maps the 
> {{values}} field to a `RAW('java.util.List', '...')` type, which also does 
> not work correctly and fails the job in case of even simplest operations like 
> printing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32043) SqlClient session unrecoverable once one wrong setting occurred

2023-05-23 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-32043:

Priority: Critical  (was: Major)

> SqlClient session unrecoverable once one wrong setting occurred
> ---
>
> Key: FLINK-32043
> URL: https://issues.apache.org/jira/browse/FLINK-32043
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.17.0
>Reporter: lincoln lee
>Priority: Critical
>
> In sql client, it can not work normally once one wrong setting occurred
> {code:java}
> // wrong setting here
> Flink SQL> SET table.sql-dialect = flink;
> [INFO] Execute statement succeed.
> Flink SQL> select '' AS f1, a from t1;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> SET table.sql-dialect = default;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET table.sql-dialect;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK
> Flink SQL> RESET;
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.table.api.SqlDialect.FLINK 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >