[GitHub] [flink] flinkbot commented on pull request #22634: [FLINK-32172][kafka] KafkaExampleUtils incorrect check of the minimum number of parameters
flinkbot commented on PR #22634: URL: https://github.com/apache/flink/pull/22634#issuecomment-1560498226 ## CI report: * 5254b522b39097420b445eab58cfa4ab6e934c9a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa opened a new pull request, #22634: [FLINK-32172][kafka] KafkaExampleUtils incorrect check of the minimum number of parameters
reswqa opened a new pull request, #22634: URL: https://github.com/apache/flink/pull/22634 ## What is the purpose of the change *KafkaExampleUtils incorrect check of the minimum number of parameters.* ## Brief change log - *Change the minimum number of parameters from `5` to `4`, aligning with error message.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #22615: [hotfix][doc] Fix typo in analyze.md
reswqa commented on PR #22615: URL: https://github.com/apache/flink/pull/22615#issuecomment-1560493433 > BTW, you should rename your commit titile to [hotfix][doc] Fix typo in analyze.md. Sorry, I may not have expressed myself clearly. What needs to be modified is not only the title of this pull request, but also the title of the commit message. Now it is: [fix: analyze.md SQL syntax error](https://github.com/apache/flink/pull/22615/commits/5e50d9b947ddb87cd8e7e700fce8c8af750ec2b9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] jasonyuan-cn commented on a diff in pull request #22615: [hotfix][doc] Fix typo in analyze.md
jasonyuan-cn commented on code in PR #22615: URL: https://github.com/apache/flink/pull/22615#discussion_r1203463401 ## docs/content/docs/dev/table/sql/analyze.md: ## @@ -379,4 +379,4 @@ ANALYZE TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, p *NOTE:* For the fix length types (like `BOOLEAN`, `INTEGER`, `DOUBLE` etc.), we need not collect the `avgLen` and `maxLen` from the original records. -{{< top >}} \ No newline at end of file +{{< top >}} Review Comment: This line of code should not be changed, sorry for my carelessness. I will undo this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,43 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +Description.builder() +.text( +"If disabled, the config execution.buffer-timeout will not take effect and the flushing will be triggered only when the output " ++ "buffer is full thus maximizing throughput") +.build()); + +public static final ConfigOption BUFFER_TIMEOUT_INTERVAL = +ConfigOptions.key("execution.buffer-timeout.interval") +.durationType() +.defaultValue(Duration.ofMillis(100)) +.withDescription( +Description.builder() +.text( +"The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " ++ "the output buffers flush frequently to provide low latency and to aid smooth developer " ++ "experience. Setting the parameter can result in three logical modes:") +.list( +text( +"A positive value triggers flushing periodically by that interval"), +text( +FLUSH_AFTER_EVERY_RECORD ++ " triggers flushing after every record thus minimizing latency"), +text( +"If the config " ++ BUFFER_TIMEOUT_ENABLED.key() ++ " is false," ++ " trigger flushing only when the output buffer is full thus maximizing " ++ "throughput")) +.build()); + +/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */ +@Deprecated public static final ConfigOption BUFFER_TIMEOUT = ConfigOptions.key("execution.buffer-timeout") Review Comment: Don't need to create a new `ConfigOption`, you can take a look this [PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4). https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,43 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +Description.builder() +.text( +"If disabled, the config execution.buffer-timeout will not take effect and the flushing will be triggered only when the output " ++ "buffer is full thus maximizing throughput") +.build()); + +public static final ConfigOption BUFFER_TIMEOUT_INTERVAL = +ConfigOptions.key("execution.buffer-timeout.interval") +.durationType() +.defaultValue(Duration.ofMillis(100)) +.withDescription( +Description.builder() +.text( +"The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " ++ "the output buffers flush frequently to provide low latency and to aid smooth developer " ++ "experience. Setting the parameter can result in three logical modes:") +.list( +text( +"A positive value triggers flushing periodically by that interval"), +text( +FLUSH_AFTER_EVERY_RECORD ++ " triggers flushing after every record thus minimizing latency"), +text( +"If the config " ++ BUFFER_TIMEOUT_ENABLED.key() ++ " is false," ++ " trigger flushing only when the output buffer is full thus maximizing " ++ "throughput")) +.build()); + +/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */ +@Deprecated public static final ConfigOption BUFFER_TIMEOUT = ConfigOptions.key("execution.buffer-timeout") Review Comment: Don't need to create a `ConfigOption`, you can take a look this [PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4). https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 commented on a diff in pull request #22599: [FLINK-31967][table-runner] Solving the Initial Value Problem of LagA…
fsk119 commented on code in PR #22599: URL: https://github.com/apache/flink/pull/22599#discussion_r1203384646 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/AggregateITCaseBase.scala: ## @@ -1184,6 +1184,34 @@ abstract class AggregateITCaseBase(testName: String) extends BatchTestBase { checkResult("select count(*) from src", Seq(row(3))) } + @Test + def testLeadAggFunction(): Unit = { +val data = + List(rowOf(2L, 15, "Hello"), rowOf(8L, 11, "Hello world"), rowOf(9L, 12, "Hello world!")) +val dataId = TestValuesTableFactory.registerData(data) +tEnv.executeSql(s""" + |CREATE TABLE src( + | `id` BIGINT, + | len INT NOT NULL, Review Comment: nit: len -> `len` ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala: ## @@ -1765,4 +1765,35 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State val expected = List("3") assertEquals(expected.sorted, sink.getRetractResults.sorted) } + + @Test + def testLagAggFunction(): Unit = { +val data = + List(rowOf(2L, 15, "Hello"), rowOf(8L, 11, "Hello world"), rowOf(9L, 12, "Hello world!")) +val dataId = TestValuesTableFactory.registerData(data) +tEnv.executeSql(s""" + |CREATE TABLE src( + | `id` BIGINT, + | len INT, Review Comment: ditto ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/LagAggFunction.java: ## @@ -44,6 +44,15 @@ public LagAggFunction(LogicalType[] valueTypes) { Arrays.stream(valueTypes) .map(DataTypeUtils::toInternalDataType) .toArray(DataType[]::new); +// When the initial value has not been initialized and is not allowed to be null or +// default value allowed to be empty,output should be changed to allow nulls Review Comment: The output value can only be not null if the default input arguments include a non-null default value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22633: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state…
flinkbot commented on PR #22633: URL: https://github.com/apache/flink/pull/22633#issuecomment-1560424440 ## CI report: * c9cdbadf7b65278f3cfc1761cf29cf67f4158efc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 opened a new pull request, #22633: [FLINK-32043][sql-gateway] Fix Sql Gateway doesn't validate set state…
fsk119 opened a new pull request, #22633: URL: https://github.com/apache/flink/pull/22633 …ment ## What is the purpose of the change Fix sql-gateway don't validate whether statement is legal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725624#comment-17725624 ] Feifan Wang commented on FLINK-29913: - Thanks [~roman] , I will prepare a pr. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32043) SqlClient session unrecoverable once one wrong setting occurred
[ https://issues.apache.org/jira/browse/FLINK-32043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-32043: - Assignee: Shengkai Fang > SqlClient session unrecoverable once one wrong setting occurred > --- > > Key: FLINK-32043 > URL: https://issues.apache.org/jira/browse/FLINK-32043 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0 >Reporter: lincoln lee >Assignee: Shengkai Fang >Priority: Critical > > In sql client, it can not work normally once one wrong setting occurred > {code:java} > // wrong setting here > Flink SQL> SET table.sql-dialect = flink; > [INFO] Execute statement succeed. > Flink SQL> select '' AS f1, a from t1; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> SET table.sql-dialect = default; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> RESET table.sql-dialect; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> RESET; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] Hexiaoqiao commented on a diff in pull request #22590: [FLINK-32071] Implement the snapshot manager for merged checkpoint files in TM
Hexiaoqiao commented on code in PR #22590: URL: https://github.com/apache/flink/pull/22590#discussion_r1203358593 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.EntropyInjector; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.OutputStreamAndPath; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingCheckpointUtils.SnapshotFileSystemInfo; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +/** Base implementation of {@link FileMergingSnapshotManager}. */ +public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager { + +private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); + +private final String id; + +protected final Executor ioExecutor; + +// file system and directories +protected FileSystem fs; +protected Path checkpointDir; +protected Path sharedStateDir; +protected Path taskOwnedStateDir; + +protected int writeBufferSize; +private boolean fileSystemInitiated = false; + +protected boolean syncAfterClosingLogicalFile; + +protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; + +private final Map managedSharedStateDir = new ConcurrentHashMap<>(); + +protected Path managedExclusiveStateDir; + +public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) { +this.id = id; +this.ioExecutor = ioExecutor; +} + +@Override +public void initFileSystem(SnapshotFileSystemInfo fileSystemInfo) throws IOException { +initFileSystem( +fileSystemInfo.fs, +fileSystemInfo.checkpointBaseDirectory, +fileSystemInfo.sharedStateDirectory, +fileSystemInfo.taskOwnedStateDirectory); +} + +@Override +public void addSubtask(SubtaskKey subtaskKey) { +String managedDirName = subtaskKey.getManagedDirName(); +Path managedPath = new Path(sharedStateDir, managedDirName); +if (!managedSharedStateDir.containsKey(subtaskKey)) { +createManagedDirectory(managedPath); +managedSharedStateDir.put(subtaskKey, managedPath); +} +} + +// +// logical & physical file +// + +protected LogicalFile createLogicalFile( +@Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey subtaskKey) { +LogicalFileId fileID = LogicalFileId.generateRandomId(); +return new LogicalFile(fileID, physicalFile, subtaskKey); +} + +@Nonnull +protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, CheckpointedStateScope scope) +throws IOException { +PhysicalFile result; +Exception latestException = null; + +Path dirPath = getManagedDir(subtaskKey, scope); + +if (dirPath == null) { +throw new IOException( +"Could not get " ++ scope ++ " path for subtask " ++ subtaskKey ++ ", the directory may have not been created."); +} + +for (int attempt = 0; attempt < 10; attempt++) { +try { +OutputStreamAndPath streamAndPath
[jira] [Comment Edited] (FLINK-32172) KafkaExample can not run with args
[ https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725622#comment-17725622 ] Weijie Guo edited comment on FLINK-32172 at 5/24/23 3:32 AM: - Thanks for the report, this should be an accidental mistake(the minimum parameter count should be 4 instead of 5). It will be fixed asap. was (Author: weijie guo): Thanks for the report, this should be an accidental typo (the minimum parameter count should be 4 instead of 5). It will be fixed asap. > KafkaExample can not run with args > -- > > Key: FLINK-32172 > URL: https://issues.apache.org/jira/browse/FLINK-32172 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0 > Environment: * win11 > * Git > * Maven (we recommend version 3.8.6) > * Java 11 >Reporter: xulongfeng >Assignee: Weijie Guo >Priority: Not a Priority > Attachments: args.png, kafkaexample.png > > > i fork and clone flink-connector-kafka repo. after build and package, i run > org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed, > comment say: > Example usage: --input-topic test-input --output-topic test-output > --bootstrap.servers > * localhost:9092 --group.id myconsumer > > but console print: Missing parameters! from KafkaExampleUtil where need 5 > paramters but we have 4 > > thank you for your attention to this matter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32172) KafkaExample can not run with args
[ https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725622#comment-17725622 ] Weijie Guo commented on FLINK-32172: Thanks for the report, this should be an accidental typo (the minimum parameter count should be 4 instead of 5). It will be fixed asap. > KafkaExample can not run with args > -- > > Key: FLINK-32172 > URL: https://issues.apache.org/jira/browse/FLINK-32172 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0 > Environment: * win11 > * Git > * Maven (we recommend version 3.8.6) > * Java 11 >Reporter: xulongfeng >Assignee: Weijie Guo >Priority: Not a Priority > Attachments: args.png, kafkaexample.png > > > i fork and clone flink-connector-kafka repo. after build and package, i run > org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed, > comment say: > Example usage: --input-topic test-input --output-topic test-output > --bootstrap.servers > * localhost:9092 --group.id myconsumer > > but console print: Missing parameters! from KafkaExampleUtil where need 5 > paramters but we have 4 > > thank you for your attention to this matter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32172) KafkaExample can not run with args
[ https://issues.apache.org/jira/browse/FLINK-32172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo reassigned FLINK-32172: -- Assignee: Weijie Guo > KafkaExample can not run with args > -- > > Key: FLINK-32172 > URL: https://issues.apache.org/jira/browse/FLINK-32172 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.0 > Environment: * win11 > * Git > * Maven (we recommend version 3.8.6) > * Java 11 >Reporter: xulongfeng >Assignee: Weijie Guo >Priority: Not a Priority > Attachments: args.png, kafkaexample.png > > > i fork and clone flink-connector-kafka repo. after build and package, i run > org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed, > comment say: > Example usage: --input-topic test-input --output-topic test-output > --bootstrap.servers > * localhost:9092 --group.id myconsumer > > but console print: Missing parameters! from KafkaExampleUtil where need 5 > paramters but we have 4 > > thank you for your attention to this matter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22632: [FLINK-32161][test] Migrate and remove some legacy ExternalResource
flinkbot commented on PR #22632: URL: https://github.com/apache/flink/pull/22632#issuecomment-1560404949 ## CI report: * 3446f19686348fdd7af3898f81cc8e501c4d56ac UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa opened a new pull request, #22632: [FLINK-32161][test] Migrate and remove some legacy ExternalResource
reswqa opened a new pull request, #22632: URL: https://github.com/apache/flink/pull/22632 ## What is the purpose of the change *Migrate and remove some legacy ExternalResource.* ## Brief change log - *Auto injects TestLoggerExtension for flink-tests.* - *Migrate and remove some legacy ExternalResource* ## Verifying this change *This change is already covered by existing tests.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32173) Flink Job Metrics returns stale values in the first request after an update in the values
Prabhu Joseph created FLINK-32173: - Summary: Flink Job Metrics returns stale values in the first request after an update in the values Key: FLINK-32173 URL: https://issues.apache.org/jira/browse/FLINK-32173 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.17.0 Reporter: Prabhu Joseph Flink Job Metrics returns stale values in the first request after an update in the values. *Repro:* 1. Run a flink job with fixed strategy and with multiple attempts {code} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 1 flink run -Dexecution.checkpointing.interval="10s" -d -c org.apache.flink.streaming.examples.wordcount.WordCount /usr/lib/flink/examples/streaming/WordCount.jar {code} 2. Kill one of the TaskManager which will initiate job restart. 3. After job restarted, fetch any job metrics. The first time it returns stale (older) value 48. {code} [hadoop@ip-172-31-44-70 ~]$ curl http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts | jq . [ { "id": "numRestarts", "value": "48" } ] {code} 4. On subsequent runs, it returns the correct value. {code} [hadoop@ip-172-31-44-70 ~]$ curl http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts | jq . [ { "id": "numRestarts", "value": "49" } ] {code} 5. Repeat steps 2 to 5, which will show that the first request after an update to the metrics returns a previous value before the update. Only on the next request is the correct value returned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xishuaidelin commented on a diff in pull request #22539: [FLINK-31956][table] Extend the CompiledPlan to read from/write to Fl…
xishuaidelin commented on code in PR #22539: URL: https://github.com/apache/flink/pull/22539#discussion_r1203332236 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -731,9 +731,16 @@ public TableResultInternal executePlan(InternalPlan plan) { } private CompiledPlan compilePlanAndWrite( -String filePath, boolean ifNotExists, Operation operation) { -File file = Paths.get(filePath).toFile(); -if (file.exists()) { +Path filePath, boolean ifNotExists, Operation operation) { +FileSystem fs; +boolean exists; +try { +fs = filePath.getFileSystem(); +exists = fs.exists(filePath); +} catch (IOException e) { +throw new RuntimeException(e); Review Comment: Okey. I have modified this in the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on pull request #22615: [hotfix][doc] Fix typo in analyze.md - SQL sample syntax error
reswqa commented on PR #22615: URL: https://github.com/apache/flink/pull/22615#issuecomment-1560379814 BTW, you should rename your commit titile to `[hotfix][doc] Fix typo in analyze.md`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22615: [hotfix][doc] Fix typo in analyze.md - SQL sample syntax error
reswqa commented on code in PR #22615: URL: https://github.com/apache/flink/pull/22615#discussion_r1203329260 ## docs/content/docs/dev/table/sql/analyze.md: ## @@ -81,8 +81,8 @@ tableEnv.executeSql( " `id` BIGINT NOT NULl," + " `product` VARCHAR(32)," + " `amount` INT," + -" `sold_year` BIGINT", + -" `sold_month` BIGINT", + +" `sold_year` BIGINT," + +" `sold_month` BIGINT," + Review Comment: Please also check the Chinese document to see if there are any similar issues. ## docs/content/docs/dev/table/sql/analyze.md: ## @@ -379,4 +379,4 @@ ANALYZE TABLE [catalog_name.][db_name.]table_name PARTITION(partcol1[=val1] [, p *NOTE:* For the fix length types (like `BOOLEAN`, `INTEGER`, `DOUBLE` etc.), we need not collect the `avgLen` and `maxLen` from the original records. -{{< top >}} \ No newline at end of file +{{< top >}} Review Comment: What is the purpose of this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (FLINK-32115) json_value support cache
[ https://issues.apache.org/jira/browse/FLINK-32115 ] xiaogang zhou deleted comment on FLINK-32115: --- was (Author: zhoujira86): [~luoyuxia] Hi yuxia, can you please help review this? > json_value support cache > > > Key: FLINK-32115 > URL: https://issues.apache.org/jira/browse/FLINK-32115 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.16.1 >Reporter: xiaogang zhou >Priority: Major > > +underlined > text+[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java] > > hive support json object cache for previous deserialized value, could we > consider use a cache objects in JsonValueCallGen? > > This optimize can improve performance of SQL like > > select > json_value(A, 'xxx'), > json_value(A, 'yyy'), > json_value(A, 'zzz'), > ... > a lot > > I added a static LRU cache into SqlJsonUtils, and refactor the > jsonValueExpression1 like > {code:java} > private static JsonValueContext jsonValueExpression1(String input) { > JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input); > if (parsedJsonContext != null) { > return parsedJsonContext; > } > try { > parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input)); > } catch (Exception e) { > parsedJsonContext = JsonValueContext.withException(e); > } > EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext); > return parsedJsonContext; > } {code} > > and benchmarked like: > {code:java} > public static void main(String[] args) { > String input = > "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};; > Long start = System.currentTimeMillis(); > for (int i = 0; i < 100; i++) { > Object dejsonize = jsonValueExpression1(input); > } > System.err.println(System.currentTimeMillis() - start); > } {code} > > time 2 benchmark takes is: > ||case||milli second taken|| > |cache|33| > |no cache|1591| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31894) ExceptionHistory and REST API failure label integration
[ https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Panagiotis Garefalakis updated FLINK-31894: --- Component/s: Runtime / REST > ExceptionHistory and REST API failure label integration > --- > > Key: FLINK-31894 > URL: https://issues.apache.org/jira/browse/FLINK-31894 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Reporter: Panagiotis Garefalakis >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard
flinkbot commented on PR #22631: URL: https://github.com/apache/flink/pull/22631#issuecomment-1560255418 ## CI report: * 4f66c765cda0fbbc84d2e8ed3883ebcde73b9b22 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dependabot[bot] opened a new pull request, #22631: Bump socket.io-parser and socket.io in /flink-runtime-web/web-dashboard
dependabot[bot] opened a new pull request, #22631: URL: https://github.com/apache/flink/pull/22631 Bumps [socket.io-parser](https://github.com/socketio/socket.io-parser) and [socket.io](https://github.com/socketio/socket.io). These dependencies needed to be updated together. Updates `socket.io-parser` from 4.0.5 to 4.2.3 Release notes Sourced from https://github.com/socketio/socket.io-parser/releases;>socket.io-parser's releases. 4.2.3 :warning: This release contains an important security fix :warning: A malicious client could send a specially crafted HTTP request, triggering an uncaught exception and killing the Node.js process: TypeError: Cannot convert object to primitive value at Socket.emit (node:events:507:25) at .../node_modules/socket.io/lib/socket.js:531:14 Please upgrade as soon as possible. Bug Fixes check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117) Links Diff: https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3;>https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3 4.2.2 Bug Fixes calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a;>22c42e3) do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece;>ae8dd88) Links Diff: https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2;>https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2 4.2.1 Bug Fixes check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/b5d0cb7dc56a0601a09b056beaeeb0e43b160050;>b5d0cb7) Links Diff: https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1;>https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1 4.2.0 Features allow the usage of custom replacer and reviver (https://redirect.github.com/socketio/socket.io-parser/issues/112;>#112) (https://github.com/socketio/socket.io-parser/commit/b08bc1a93e8e3194b776c8a0bdedee1e29333680;>b08bc1a) Links Diff: https://github.com/socketio/socket.io-parser/compare/4.1.2...4.2.0;>https://github.com/socketio/socket.io-parser/compare/4.1.2...4.2.0 ... (truncated) Changelog Sourced from https://github.com/socketio/socket.io-parser/blob/main/CHANGELOG.md;>socket.io-parser's changelog. https://github.com/socketio/socket.io-parser/compare/4.2.2...4.2.3;>4.2.3 (2023-05-22) Bug Fixes check the format of the event name (https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117) https://github.com/socketio/socket.io-parser/compare/4.2.1...4.2.2;>4.2.2 (2023-01-19) Bug Fixes calling destroy() should clear all internal state (https://github.com/socketio/socket.io-parser/commit/22c42e3545e4adbc5931276c378f5d62c8b3854a;>22c42e3) do not modify the input packet upon encoding (https://github.com/socketio/socket.io-parser/commit/ae8dd88995dbd7f89c97e5cc15e5b489fa0efece;>ae8dd88) https://github.com/Automattic/socket.io-parser/compare/3.3.2...3.3.3;>3.3.3 (2022-11-09) Bug Fixes check the format of the index of each attachment (https://github.com/Automattic/socket.io-parser/commit/fb21e422fc193b34347395a33e0f625bebc09983;>fb21e42) https://github.com/socketio/socket.io-parser/compare/3.4.1...3.4.2;>3.4.2 (2022-11-09) Bug Fixes check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/04d23cecafe1b859fb03e0cbf6ba3b74dff56d14;>04d23ce) https://github.com/socketio/socket.io-parser/compare/4.2.0...4.2.1;>4.2.1 (2022-06-27) Bug Fixes check the format of the index of each attachment (https://github.com/socketio/socket.io-parser/commit/b5d0cb7dc56a0601a09b056beaeeb0e43b160050;>b5d0cb7) Commits https://github.com/socketio/socket.io-parser/commit/b6c824f82421aa44dfd5ef395f5132866543de59;>b6c824f chore(release): 4.2.3 https://github.com/socketio/socket.io-parser/commit/dcc70d9678ac896de08294d6e8d668be6a68680a;>dcc70d9 refactor: export typescript declarations for the commonjs build https://github.com/socketio/socket.io-parser/commit/3b78117bf6ba7e99d7a5cfc1ba54d0477554a7f3;>3b78117 fix: check the format of the event name https://github.com/socketio/socket.io-parser/commit/0841bd562351c3d45a5288e2adf9707cc8a3131d;>0841bd5 chore: bump ua-parser-js from 1.0.32 to 1.0.33 (https://redirect.github.com/socketio/socket.io-parser/issues/121;>#121) https://github.com/socketio/socket.io-parser/commit/28dd6685021353b26a4b022e25b453c627d0a7e8;>28dd668 chore(release): 4.2.2
[jira] [Created] (FLINK-32172) KafkaExample can not run with args
xulongfeng created FLINK-32172: -- Summary: KafkaExample can not run with args Key: FLINK-32172 URL: https://issues.apache.org/jira/browse/FLINK-32172 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.0 Environment: * win11 * Git * Maven (we recommend version 3.8.6) * Java 11 Reporter: xulongfeng Attachments: args.png, kafkaexample.png i fork and clone flink-connector-kafka repo. after build and package, i run org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed, comment say: Example usage: --input-topic test-input --output-topic test-output --bootstrap.servers * localhost:9092 --group.id myconsumer but console print: Missing parameters! from KafkaExampleUtil where need 5 paramters but we have 4 thank you for your attention to this matter -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32171) Add PostStart hook to flink k8s operator helm
[ https://issues.apache.org/jira/browse/FLINK-32171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725588#comment-17725588 ] Xingcan Cui commented on FLINK-32171: - Hi [~gyfora], would like to get your thoughts on this. I can work on it if you think this feature is reasonable. Thanks! > Add PostStart hook to flink k8s operator helm > - > > Key: FLINK-32171 > URL: https://issues.apache.org/jira/browse/FLINK-32171 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Xingcan Cui >Priority: Minor > Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1 > > > I feel it will be convenient to add a PostStart hook optional config to flink > k8s operator helm (e.g. when users need to download some Flink plugins). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32171) Add PostStart hook to flink k8s operator helm
Xingcan Cui created FLINK-32171: --- Summary: Add PostStart hook to flink k8s operator helm Key: FLINK-32171 URL: https://issues.apache.org/jira/browse/FLINK-32171 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Xingcan Cui Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1 I feel it will be convenient to add a PostStart hook optional config to flink k8s operator helm (e.g. when users need to download some Flink plugins). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pgaref commented on pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
pgaref commented on PR #22608: URL: https://github.com/apache/flink/pull/22608#issuecomment-1560231799 > Looks good overall, I'm just not a big fan of introducing a new impl. of the execution graph. Can we maybe do something along these lines instead? https://gist.github.com/dmvk/7481082e5584c9a1eff377282d6b93df Thanks @dmvk ! Was also looking for a smarter way to throw on ExecutionGraph init -- Using the same intermediate dataset id on initializeJobVertex is perfect! Updated :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling
pgaref commented on PR #22564: URL: https://github.com/apache/flink/pull/22564#issuecomment-1560181396 Thanks for the thorough review @dmvk ! PR is now updated :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pgaref commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling
pgaref commented on code in PR #22564: URL: https://github.com/apache/flink/pull/22564#discussion_r1203063467 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1992,4 +2035,120 @@ public DummyState getState() { } } } + +private static class ExceptionHistoryTester { Review Comment: Totally, I just missed that! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kurtostfeld closed pull request #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti…
kurtostfeld closed pull request #22586: [FLINK-31880][tests] Fixes unit test so it will pass regardless of ti… URL: https://github.com/apache/flink/pull/22586 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725569#comment-17725569 ] Roman Khachatryan commented on FLINK-29913: --- Got it, thanks for clarifying. However, I think that using "Integer.toString(System.identityHashCode(stateHandle))" can easily lead to collisions, thereby causing checkpoint corruption. But this seems to me an (important) implementation detail, that can be discussed in the PR. WDYT? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-31095) FileSink doesn't work with s3a on EKS
[ https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvia Lin closed FLINK-31095. -- Resolution: Not A Bug > FileSink doesn't work with s3a on EKS > - > > Key: FLINK-31095 > URL: https://issues.apache.org/jira/browse/FLINK-31095 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: Sylvia Lin >Priority: Major > > FileSink gives below exception on AWS EKS cluster: > {code:java} > Caused by: java.lang.UnsupportedOperationException: This s3 file system > implementation does not support recoverable writers. > at > org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175) > ~[flink-connector-files-1.16.1.jar:1.16.1]{code} > [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16] > And this may be related to > https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31095) FileSink doesn't work with s3a on EKS
[ https://issues.apache.org/jira/browse/FLINK-31095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725539#comment-17725539 ] Sylvia Lin commented on FLINK-31095: Yeah, it's working! Thanks! I can close the ticket. > FileSink doesn't work with s3a on EKS > - > > Key: FLINK-31095 > URL: https://issues.apache.org/jira/browse/FLINK-31095 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.1 >Reporter: Sylvia Lin >Priority: Major > > FileSink gives below exception on AWS EKS cluster: > {code:java} > Caused by: java.lang.UnsupportedOperationException: This s3 file system > implementation does not support recoverable writers. > at > org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) > ~[flink-dist-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466) > ~[flink-connector-files-1.16.1.jar:1.16.1] > at > org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175) > ~[flink-connector-files-1.16.1.jar:1.16.1]{code} > [https://github.com/apache/flink/blob/278dc7b793303d228f7816585054629708983af6/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java#LL136C16-L136C16] > And this may be related to > https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #29: [FLINK-32021][Connectors/Kafka] Improvement the Javadoc for SpecifiedOffsetsInitializer and TimestampOffsetsInitial
RamanVerma commented on code in PR #29: URL: https://github.com/apache/flink-connector-kafka/pull/29#discussion_r1202797216 ## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java: ## @@ -38,6 +38,10 @@ * An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition * according to the user specified offsets. * + * Use specified offsets for specified partitions while use commit offsets or earliest for Review Comment: thanks for the explanation @loserwang1024 As you said, you can change that comment to "use offsetResetStrategy which is default earliest”. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #22630: [hotfix] Use https://repo.maven.apache.org in maven-utils.sh
flinkbot commented on PR #22630: URL: https://github.com/apache/flink/pull/22630#issuecomment-1559869076 ## CI report: * abba63af11465e74c8b5ffc08cbaa622e8666064 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] snuyanzin opened a new pull request, #22630: [hotfix] Use https://repo.maven.apache.org in maven-utils.sh
snuyanzin opened a new pull request, #22630: URL: https://github.com/apache/flink/pull/22630 ## What is the purpose of the change The idea is to use `https://repo.maven.apache.org` (same as `maven-wrapper.properties`) to download maven via `maven-utils.sh` since connection to `https://archive.apache.org` seems to be unstable sometimes like e.g. at https://issues.apache.org/jira/browse/FLINK-32106 ## Brief change log `maven-utils.sh` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no ) - The S3 file system connector: ( no ) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32170) Continue metric collection on intermittant job restarts
Maximilian Michels created FLINK-32170: -- Summary: Continue metric collection on intermittant job restarts Key: FLINK-32170 URL: https://issues.apache.org/jira/browse/FLINK-32170 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels If the underlying infrastructure is not stable, e.g. Kubernetes pod eviction, the jobs will sometimes restart. This will restart the metric collection process for the autoscaler and discard any existing metrics. If the interruption time is short, e.g. less than one minute, we could consider resuming metric collection after the job goes back into RUNNING state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] lzshlzsh commented on a diff in pull request #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletio
lzshlzsh commented on code in PR #22612: URL: https://github.com/apache/flink/pull/22612#discussion_r1202716866 ## flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseTimestampGeneratorTest.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.util; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** Tests for {@link HBaseTimestampGenerator}. */ +public class HBaseTimestampGeneratorTest { +@Test +public void testStronglyIncreasingTimestampGenerator() { +HBaseTimestampGenerator timestampGenerator = HBaseTimestampGenerator.stronglyIncreasing(); +long lastTimestamp = 0; +for (int i = 0; i < 100_000_000; i++) { Review Comment: > Please have a look at https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing - Having a test run 100 million times isn't an effective test @MartijnVisser Thank you for your review and valuable feedback. I think I may need to use Junit5 to write test cases, and change 100 million times tests to one times。Can you give more suggestions on how to write this test? Actually, I have another question. How to write test cases for the probability of errors occurring in this data loss situation, as we cannot assert a definite state ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lzshlzsh commented on a diff in pull request #22612: [FLINK-32139][connector/hbase] Using strongly increasing nanosecond timestamp and DeleteColumn type to fix data accidental deletio
lzshlzsh commented on code in PR #22612: URL: https://github.com/apache/flink/pull/22612#discussion_r1202716866 ## flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseTimestampGeneratorTest.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase.util; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** Tests for {@link HBaseTimestampGenerator}. */ +public class HBaseTimestampGeneratorTest { +@Test +public void testStronglyIncreasingTimestampGenerator() { +HBaseTimestampGenerator timestampGenerator = HBaseTimestampGenerator.stronglyIncreasing(); +long lastTimestamp = 0; +for (int i = 0; i < 100_000_000; i++) { Review Comment: > Please have a look at https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing - Having a test run 100 million times isn't an effective test Thank you for your review and valuable feedback. I think I may need to use Junit5 to write test cases, and change 100 million times tests to one times。Can you give more suggestions on how to write this test? Actually, I have another question. How to write test cases for the probability of errors occurring in this data loss situation, as we cannot assert a definite state ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31684) Autoscaler metrics are only visible after metric window is full
[ https://issues.apache.org/jira/browse/FLINK-31684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31684: --- Labels: pull-request-available (was: ) > Autoscaler metrics are only visible after metric window is full > --- > > Key: FLINK-31684 > URL: https://issues.apache.org/jira/browse/FLINK-31684 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > > The metrics get reported only after the metric window is full. This is not > helpful for observability after rescaling. We need to make sure that metrics > are reported even when the metric window is not yet full. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] mxm opened a new pull request, #606: [FLINK-31684] Report autoscaling metrics before metric window is full
mxm opened a new pull request, #606: URL: https://github.com/apache/flink-kubernetes-operator/pull/606 The metrics get reported only after the metric window is full. This is not helpful for observability after rescaling. We need to make sure that metrics are reported even when the metric window is not yet full. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface
XComp commented on code in PR #22384: URL: https://github.com/apache/flink/pull/22384#discussion_r1202637964 ## flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * {@code TestingLeaderElection} implements simple leader election for test cases where no {@code + * LeaderElectionService} is required. + */ +public class TestingLeaderElection implements LeaderElection { + +/** + * Is {@code null} if the {@code LeaderElection} isn't started. + * + * @see LeaderElection#startLeaderElection(LeaderContender) + */ +@Nullable private LeaderContender contender = null; + +@Nullable private CompletableFuture confirmationFuture = null; Review Comment: hm, that made me start thinking. I guess, you're right: The `confirmationFuture` is bound to the `LeaderContender`. Resetting the contender should, indeed, also cause the `confirmationFuture` to be cancelled. But I did another round of digging: The `triggerContenderCleanup` was actually only exposed because of [DispatcherCleanupITCase:309](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java#L309) where it should have been used instead of calling stop (which felt unnatural in this case because we didn't actually wanted to stop the leader election but just wanted to reset the instance). The test itself seems to be odd: We actually don't need to reset the leader election because the following code would just start a cleanup process which doesn't rely on leader election anymore. This change happened in FLINK-25432 (in https://github.com/apache/flink/commit/cc5d321d). The test wasn't properly cleaned up/refactored to reflect the new behavior. Therefore, we could just remove the leader election reset. As a consequence, there wouldn't be a need to expose the `triggerContenderCleanup`. I'm going to provide a hotfix commit to clean the test up and revert the `triggerContenderCleanup` method exposure. The test code shouldn't be able to reset the leader election because it might make it possible to workaround badly structured code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725470#comment-17725470 ] Feifan Wang edited comment on FLINK-29913 at 5/23/23 3:45 PM: -- Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} And we can only use normal handles (not PlaceholderStreamStateHandle) in IncrementalRemoteKeyedStateHandle to make sure IncrementalRemoteKeyedStateHandle#generateRegisterKey() method never get a PlaceholderStreamStateHandle. was (Author: feifan wang): Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter:
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725470#comment-17725470 ] Feifan Wang commented on FLINK-29913: - Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725466#comment-17725466 ] Gyula Fora commented on FLINK-32012: I think you are on the right track. But we should ensure that in the lastReconciledSpec we record the upgradeMode that was used during the last deployment / rollback otherwise subsequent upgrades can be problematic. If we used a savepoint we should record savepoint if HA metadata was used it should be last-state. > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220] > Check if some child znodes are available > For both step the pattern looks to be the same for kubernetes HA so it > doesn't looks to be linked to a bug with zookeeper. > > From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be > expected that the HA data has been deleted (as it is also performed by flink > when relying on savepoint upgrade mode). > Still the use case seems to differ from > https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of > the failure and treat a specific rollback event. > So I'm wondering why we enforce such a check when performing rollback if we > rely on savepoint upgrade mode. Would it be fine to not rely on the HA data > and rollback from the last savepoint (the one we used in the deployment step)? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725453#comment-17725453 ] Gyula Fora commented on FLINK-32012: Yes you are right. But this is already handled in the getAvailableUpgradeModes logic during a normal upgrade. If your job failed to start after a savepoint upgrade you can send in a new spec and it will be upgraded using the previous savepoint. > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220] > Check if some child znodes are available > For both step the pattern looks to be the same for kubernetes HA so it > doesn't looks to be linked to a bug with zookeeper. > > From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be > expected that the HA data has been deleted (as it is also performed by flink > when relying on savepoint upgrade mode). > Still the use case seems to differ from > https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of > the failure and treat a specific rollback event. > So I'm wondering why we enforce such a check when performing rollback if we > rely on savepoint upgrade mode. Would it be fine to not rely on the HA data > and rollback from the last savepoint (the one we used in the deployment step)? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725450#comment-17725450 ] Nicolas Fraison edited comment on FLINK-32012 at 5/23/23 3:05 PM: -- But the whole point of this request was the fact that when JobManager failed to start the HA metadata was not available during RollBack while the savepoint taken for the upgrade is available. So relying on SAVEPOINT for rollback will ensure that Flink is aware of the availability of a savepoint. >From my understanding of >[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198] > Flink will rely on the saveoint if no HA metadata exist otherwise it will >load the checkpoint Forgot to mention it but indeed when calling [cancelJob|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L353%5D] it fallback to LAST_STATE in order to ensure no savepoint is created and HA metadata is not deleted was (Author: JIRAUSER299678): But the whole point of this request was the fact that when JobManager failed to start the HA metadata was not available during RollBack while the savepoint taken for the upgrade is available. So relying on SAVEPOINT for rollback will ensure that Flink is aware of the availability of a savepoint. >From my understanding of >[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198] > Flink will rely on the saveoint if no HA metadata exist otherwise it will >load the checkpoint > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · >
[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725450#comment-17725450 ] Nicolas Fraison commented on FLINK-32012: - But the whole point of this request was the fact that when JobManager failed to start the HA metadata was not available during RollBack while the savepoint taken for the upgrade is available. So relying on SAVEPOINT for rollback will ensure that Flink is aware of the availability of a savepoint. >From my understanding of >[tryRestoreExecutionGraphFromSavepoint|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java#L198] > Flink will rely on the saveoint if no HA metadata exist otherwise it will >load the checkpoint > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220] > Check if some child znodes are available > For both step the pattern looks to be the same for kubernetes HA so it > doesn't looks to be linked to a bug with zookeeper. > > From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be > expected that the HA data has been deleted (as it is also performed by flink > when relying on savepoint upgrade mode). > Still the use case seems to differ from > https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of > the failure and treat a specific rollback event. > So I'm wondering why we enforce such a check when performing rollback if we > rely on savepoint upgrade mode. Would it be fine to not rely on the HA data > and rollback from the last savepoint (the one we used in the deployment step)? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32162) Misleading log message due to missing null check
[ https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-32162: - Priority: Minor (was: Major) > Misleading log message due to missing null check > > > Key: FLINK-32162 > URL: https://issues.apache.org/jira/browse/FLINK-32162 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Labels: pull-request-available > Fix For: 1.18.0 > > > Updating the job requirements always logs "Failed to update requirements for > job {}." because we don't check whether the error is not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32162) Misleading log message due to missing null check
[ https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-32162. Resolution: Fixed master: fadde2a378aac4293676944dd513291919a481e3 > Misleading log message due to missing null check > > > Key: FLINK-32162 > URL: https://issues.apache.org/jira/browse/FLINK-32162 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Updating the job requirements always logs "Failed to update requirements for > job {}." because we don't check whether the error is not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol merged pull request #22628: [FLINK-32162] Only log error if an error occurred
zentol merged PR #22628: URL: https://github.com/apache/flink/pull/22628 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #22628: [FLINK-32162] Only log error if an error occurred
zentol commented on code in PR #22628: URL: https://github.com/apache/flink/pull/22628#discussion_r1202484203 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1182,7 +1182,10 @@ public CompletableFuture updateJobResourceRequirements( getMainThreadExecutor()) .whenComplete( (ack, error) -> { -log.debug("Failed to update requirements for job {}.", jobId, error); +if (error != null) { +log.debug( Review Comment: This will already be logged on error by the rest handler, so I think debug is fine here because its only relevant for devs, not users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 closed pull request #22627: [hotfix][doc] Use the flink-connector-jdbc v3.1 branch docs
wanglijie95 closed pull request #22627: [hotfix][doc] Use the flink-connector-jdbc v3.1 branch docs URL: https://github.com/apache/flink/pull/22627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32169) Show allocated slots on TM page
Chesnay Schepler created FLINK-32169: Summary: Show allocated slots on TM page Key: FLINK-32169 URL: https://issues.apache.org/jira/browse/FLINK-32169 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Show the allocated slogs on the TM page, so that you can better understand which job is consuming what resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dmvk commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
dmvk commented on code in PR #22608: URL: https://github.com/apache/flink/pull/22608#discussion_r1202427606 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java: ## @@ -91,6 +96,28 @@ void setUp() { taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } +@Test +void testExceptionHistoryWithGlobalFailureLabels() throws Exception { Review Comment: for the sake of completeness, it would be nice to explicitly configure the restart strategy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22608: [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure enrichment/labeling
dmvk commented on code in PR #22608: URL: https://github.com/apache/flink/pull/22608#discussion_r1202413078 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java: ## @@ -91,6 +96,28 @@ void setUp() { taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); } +@Test +void testExceptionHistoryWithGlobalFailureLabels() throws Exception { Review Comment: Maybe this should state something along the lines of "failure during vertex initialization" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-jdbc] boring-cyborg[bot] commented on pull request #50: [hotfix] Backport some docs-related fixes to v3.1
boring-cyborg[bot] commented on PR #50: URL: https://github.com/apache/flink-connector-jdbc/pull/50#issuecomment-1559480963 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725423#comment-17725423 ] Gyula Fora commented on FLINK-32012: [~nfraison.datadog] , I don't think it's possible to use "SAVEPOINT" upgrade mode when rolling back. That would imply that we can take a savepoint which means that the job would be running. Also regarding your comment about what to set in the status for upgradeMode, we have to set what we actually ended up using, there is some logic that relies on this. The first time a job is started from an empty state we record stateless, if you started with initialSavepointPath then it would be savepoint. All these are key pieces of logic that allows us to upgrade safely without accidentally losing state. > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220] > Check if some child znodes are available > For both step the pattern looks to be the same for kubernetes HA so it > doesn't looks to be linked to a bug with zookeeper. > > From https://issues.apache.org/jira/browse/FLINK-30305 it looks to be > expected that the HA data has been deleted (as it is also performed by flink > when relying on savepoint upgrade mode). > Still the use case seems to differ from > https://issues.apache.org/jira/browse/FLINK-30305 as the operator is aware of > the failure and treat a specific rollback event. > So I'm wondering why we enforce such a check when performing rollback if we > rely on savepoint upgrade mode. Would it be fine to not rely on the HA data > and rollback from the last savepoint (the one we used in the deployment step)? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] Thesharing commented on pull request #652: [FLINK-32051] Fix broken documentation links in Flink blogs
Thesharing commented on PR #652: URL: https://github.com/apache/flink-web/pull/652#issuecomment-1559436745 > +1, I met this same issue at flink blog. But why insert some blanks in some one doc? Thanks for the reply. It's because without these blanks, the links in the corresponding paragraph will be broken. You could find the broken links in this [blog](https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/#reducing-complexity-with-groups). https://github.com/apache/flink-web/assets/6576831/a86e98b1-7aa0-44e6-a2bf-1b41ab7c57fe;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan reassigned FLINK-29913: - Assignee: Feifan Wang > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-29913: -- Fix Version/s: 1.16.2 1.17.2 > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-29913: -- Affects Version/s: 1.17.0 > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roman Khachatryan updated FLINK-29913: -- Priority: Major (was: Minor) > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725418#comment-17725418 ] Roman Khachatryan commented on FLINK-29913: --- {quote}On a 64-bit system, about 7.63MB more memory will be used for every one million entries Is there any other runtime overhead I missed ? {quote} I was more concerned about the additional time required to traverse the (mostly single-element) lists. When a checkpoint is subsumed, *all* entries need to be scanned. Adding pointer dereference(s) might break any optimizations that JVM and CPU would otherwise employ. {quote}As for the complexity, this approach will indeed increase the operation of the linked list in the registerReference() method and unregisterUnusedState() method. But given that this is easy to implement, and the implementation is cohesive, I think the complexity is acceptable. {quote} In my view, simplicity in this is part is worth the efforts. The problem that SharedStateRegistry solves is already tricky, and we shouldn't complicate it further (higher complexity potentially leads to more bugs and more maintanance efforts). {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I don't fully understand what does "never changed" means here. [Here|https://github.com/apache/flink/blob/fbf7b91424ec626ae56dd2477347a7759db6d5fe/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105], the ID is used to create local path. If we change ID to remote path, local will change too. Per my understanding, we can not change the local path without updating metadata files. Or am I missing something? {quote}Whichever approach will be chosen, I am happy to implement it. Can you assign this ticket to me Roman Khachatryan ? looking forward to hearing from you. {quote} Sure, thanks for volounteering! > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32168) Log required/available resources in RM
Chesnay Schepler created FLINK-32168: Summary: Log required/available resources in RM Key: FLINK-32168 URL: https://issues.apache.org/jira/browse/FLINK-32168 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 When matching requirements against available resource the RM currently doesn't log anything apart from whether it could fulfill the resources or not. We can make the system easier to audit by logging the current requirements, available resources, and how many resources are left after the matching. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32167) Log dynamic slot creation on task manager
Chesnay Schepler created FLINK-32167: Summary: Log dynamic slot creation on task manager Key: FLINK-32167 URL: https://issues.apache.org/jira/browse/FLINK-32167 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 When a slot is dynamically allocated on the TM we should log that this happens, what resources it consumes and what the remaining resources are. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32166) Show unassigned/total TM resources in web ui
Chesnay Schepler created FLINK-32166: Summary: Show unassigned/total TM resources in web ui Key: FLINK-32166 URL: https://issues.apache.org/jira/browse/FLINK-32166 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 It is important to know how many resources of a TM are currently _assigned_ to jobs. This is different to what resources currently _used_, since you can have assigned 1gb memory to a job with it only using 10mb at this time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32165) Improve observability ofd fine-grained resource management
Chesnay Schepler created FLINK-32165: Summary: Improve observability ofd fine-grained resource management Key: FLINK-32165 URL: https://issues.apache.org/jira/browse/FLINK-32165 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Right now fine-grained resource management is way too much of a black-box, with the only source of information being the taskmanager rest endpoints. While this is fine-ish for services built around it the developer experience is suffering greatly and it becomes impossible to reason about the system afterwards (because we don't even log anything). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32165) Improve observability of fine-grained resource management
[ https://issues.apache.org/jira/browse/FLINK-32165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-32165: - Summary: Improve observability of fine-grained resource management (was: Improve observability ofd fine-grained resource management) > Improve observability of fine-grained resource management > - > > Key: FLINK-32165 > URL: https://issues.apache.org/jira/browse/FLINK-32165 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Web Frontend >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.18.0 > > > Right now fine-grained resource management is way too much of a black-box, > with the only source of information being the taskmanager rest endpoints. > While this is fine-ish for services built around it the developer experience > is suffering greatly and it becomes impossible to reason about the system > afterwards (because we don't even log anything). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32012) Operator failed to rollback due to missing HA metadata
[ https://issues.apache.org/jira/browse/FLINK-32012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725413#comment-17725413 ] Nicolas Fraison commented on FLINK-32012: - [~gyfora], I started an implementation which do not modify the upgradeMode to last-state as the [restoreJob|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L143%5D] for LAST_STATE upgrade mode will enforce the requirement for HA metadata which is not what we want when relying on SAVEPOINT. Also when restoring last stable spec there is a case where the UpgradeMode is set to STATELESS in this spec even if the chosen mode is SAVEPOINT ([updateStatusBeforeFirstDeployment|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L193]) In order to avoid restoring this bad state leading to rollback not taking in account the savepoint I enforce the upgrade mode of the restored spec to be the one currently set on the job. But I'm wondering why we have decided to not persist the really used upgrade mode in the last stable spec for first deployment? Here is the diff of my [WIP|https://github.com/apache/flink-kubernetes-operator/compare/main...ashangit:flink-kubernetes-operator:nfraison/FLINK-32012?expand=1] if approach is not clear (not to review...) > Operator failed to rollback due to missing HA metadata > -- > > Key: FLINK-32012 > URL: https://issues.apache.org/jira/browse/FLINK-32012 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > The operator has well detected that the job was failing and initiate the > rollback but this rollback has failed due to `Rollback is not possible due to > missing HA metadata` > We are relying on saevpoint upgrade mode and zookeeper HA. > The operator is performing a set of action to also delete this HA data in > savepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L346] > : Suspend job with savepoint and deleteClusterDeployment > * [flink-kubernetes-operator/StandaloneFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java#L158] > : Remove JM + TM deployment and delete HA data > * [flink-kubernetes-operator/AbstractFlinkService.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L1008] > : Wait cluster shutdown and delete zookeeper HA data > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L155] > : Remove all child znode > Then when running rollback the operator is looking for HA data even if we > rely on sevepoint upgrade mode: > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L164] > Perform reconcile of rollback if it should rollback > * [flink-kubernetes-operator/AbstractFlinkResourceReconciler.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L387] > Rollback failed as HA data is not available > * [flink-kubernetes-operator/FlinkUtils.java at main · > apache/flink-kubernetes-operator|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java#L220] > Check if some child znodes are available > For both step the pattern looks to be the same for kubernetes HA so it > doesn't looks to be linked
[jira] [Comment Edited] (FLINK-32163) Support the same application run multiple jobs in HA mode
[ https://issues.apache.org/jira/browse/FLINK-32163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725412#comment-17725412 ] Martijn Visser edited comment on FLINK-32163 at 5/23/23 1:33 PM: - [~melin] The ticket is unclear for me. Looking at https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus I think this first needs to be resolved on the Dev mailing list, before opening a Jira ticket. was (Author: martijnvisser): [~melin] The ticket is unclear for me. Looking at https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus I think this first needs to be resolved on a mailing list, before opening a Jira ticket. > Support the same application run multiple jobs in HA mode > - > > Key: FLINK-32163 > URL: https://issues.apache.org/jira/browse/FLINK-32163 > Project: Flink > Issue Type: New Feature >Reporter: melin >Priority: Major > > Support the same application run multiple jobs in HA mode -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32163) Support the same application run multiple jobs in HA mode
[ https://issues.apache.org/jira/browse/FLINK-32163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725412#comment-17725412 ] Martijn Visser commented on FLINK-32163: [~melin] The ticket is unclear for me. Looking at https://flink.apache.org/how-to-contribute/contribute-code/#1-create-jira-ticket-and-reach-consensus I think this first needs to be resolved on a mailing list, before opening a Jira ticket. > Support the same application run multiple jobs in HA mode > - > > Key: FLINK-32163 > URL: https://issues.apache.org/jira/browse/FLINK-32163 > Project: Flink > Issue Type: New Feature >Reporter: melin >Priority: Major > > Support the same application run multiple jobs in HA mode -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on pull request #22618: [FLINK-32153][build] Limit powermock to core/runtime
zentol commented on PR #22618: URL: https://github.com/apache/flink/pull/22618#issuecomment-1559362927 yeah I've noticed it already; my guess is the mockito-core exclusion isn't working as it should. Will fix that later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace
[ https://issues.apache.org/jira/browse/FLINK-32164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-32164: --- Issue Type: Bug (was: Improvement) > LifecycleState count metrics are not reported correctly by namespace > > > Key: FLINK-32164 > URL: https://issues.apache.org/jira/browse/FLINK-32164 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.4.0, kubernetes-operator-1.5.0 >Reporter: Gyula Fora >Priority: Major > > The per namespace lifecycle state count metrics are incorrectly show a global > count: > https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace
Gyula Fora created FLINK-32164: -- Summary: LifecycleState count metrics are not reported correctly by namespace Key: FLINK-32164 URL: https://issues.apache.org/jira/browse/FLINK-32164 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.4.0 Reporter: Gyula Fora The per namespace lifecycle state count metrics are incorrectly show a global count: https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] dmvk commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling
dmvk commented on code in PR #22564: URL: https://github.com/apache/flink/pull/22564#discussion_r1202308510 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1992,4 +2035,120 @@ public DummyState getState() { } } } + +private static class ExceptionHistoryTester { Review Comment: Would it be possible to use this as a replacement for `org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest#runExceptionHistoryTests(java.util.function.BiConsumer>, java.util.function.Consumer, java.util.function.Consumer)` as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22564: [FLINK-31891][runtime] Introduce AdaptiveScheduler per-task failure enrichment/labeling
dmvk commented on code in PR #22564: URL: https://github.com/apache/flink/pull/22564#discussion_r1202299569 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java: ## @@ -132,6 +132,22 @@ private ExceptionHistoryEntry( this.failureLabels = Collections.unmodifiableMap(labelMap)); } +/** + * Creates a new ExceptionHistoryEntry copy of the original but with the provided labels + * (currently used for Failures). + * + * @param failureLabels to be passed to the new ExceptionHistoryEntry + * @return new ExceptionHistoryEntry with the associated labels + */ +public ExceptionHistoryEntry withLabels(CompletableFuture> failureLabels) { Review Comment: is this unused? ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1992,4 +2035,120 @@ public DummyState getState() { } } } + +private static class ExceptionHistoryTester { Review Comment: Would it be possible to use this for as a replacement for `org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest#runExceptionHistoryTests(java.util.function.BiConsumer>, java.util.function.Consumer, java.util.function.Consumer)` as well? ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1295,23 +1329,25 @@ void testExceptionHistoryWithTaskFailure() throws Exception { @Test void testExceptionHistoryWithTaskFailureWithRestart() throws Exception { final Exception expectedException = new Exception("Expected Local Exception"); -Consumer setupScheduler = +final Consumer setupScheduler = builder -> builder.setRestartBackoffTimeStrategy( new FixedDelayRestartBackoffTimeStrategy .FixedDelayRestartBackoffTimeStrategyFactory(1, 100) .create()); -BiConsumer> testLogic = +final BiConsumer> testLogic = (scheduler, attemptIds) -> { final ExecutionAttemptID attemptId = attemptIds.get(1); scheduler.updateTaskExecutionState( new TaskExecutionStateTransition( new TaskExecutionState( attemptId, ExecutionState.FAILED, expectedException))); }; - final Iterable actualExceptionHistory = -runExceptionHistoryTests(testLogic, setupScheduler); Review Comment: this method is no longer used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32163) Support the same application run multiple jobs in HA mode
melin created FLINK-32163: - Summary: Support the same application run multiple jobs in HA mode Key: FLINK-32163 URL: https://issues.apache.org/jira/browse/FLINK-32163 Project: Flink Issue Type: New Feature Reporter: melin Support the same application run multiple jobs in HA mode -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-jdbc] eskabetxe commented on pull request #22: [FLINK-30790] Add DatabaseExtension with TableManaged for testing
eskabetxe commented on PR #22: URL: https://github.com/apache/flink-connector-jdbc/pull/22#issuecomment-1559224399 @MartijnVisser any news for merging this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725387#comment-17725387 ] Feifan Wang commented on FLINK-29913: - One overhead I can see is that it will use more memory for storing the next pointer. On a 64-bit system, about 7.63MB more memory will be used for every one million entries, I think it is acceptable. Is there any other runtime overhead I missed ? As for the complexity, this approach will indeed increase the operation of the linked list in the _registerReference()_ method and _unregisterUnusedState()_ method. But given that this is easy to implement, and the implementation is cohesive, I think the complexity is acceptable. Just to clarify, I think using a unique ID is also a valid approach, but I want learn how you do the selection. Further, regarding the approach of using unique registry key, I agree with [~klion26] , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. Whichever approach will be chosen, I am happy to implement it. Can you assign this ticket to me [~roman] ? looking forward to hearing from you. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function
flinkbot commented on PR #22629: URL: https://github.com/apache/flink/pull/22629#issuecomment-1559168847 ## CI report: * c9a4a54f5ac5b6dccdd762ac13955f8bea8dd577 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32056) Update the used Pulsar connector in flink-python to 4.0.0
[ https://issues.apache.org/jira/browse/FLINK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-32056: --- Priority: Critical (was: Major) > Update the used Pulsar connector in flink-python to 4.0.0 > - > > Key: FLINK-32056 > URL: https://issues.apache.org/jira/browse/FLINK-32056 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Pulsar >Affects Versions: 1.18.0, 1.17.1 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Critical > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > flink-python still references and tests flink-connector-pulsar:3.0.0, while > it should be using flink-connector-pulsar:4.0.0. That's because the newer > version is the only version compatible with Flink 1.17 and it doesn't rely on > flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] liuyongvs commented on pull request #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function
liuyongvs commented on PR #22629: URL: https://github.com/apache/flink/pull/22629#issuecomment-1559161153 hi @dawidwys do you have time to help review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyongvs opened a new pull request, #22629: [FLINK-31664][table] Add ARRAY_INTERSECT function
liuyongvs opened a new pull request, #22629: URL: https://github.com/apache/flink/pull/22629 - What is the purpose of the change This is an implementation of ARRAY_INTERSECT - Brief change log ARRAY_INTERSECT for Table API and SQL ``` Returns an array of the elements in the intersection of array1 and array2, without duplicates. Syntax: array_intersect(array1, array2) Arguments: array: An ARRAY to be handled. Returns: An ARRAY. If any of the array is null, the function will return null. Examples: > SELECT array_intersect(array(1, 2, 3), array(1, 3, 5)); [1,3] ``` See also spark https://spark.apache.org/docs/latest/api/sql/index.html#array_intersect presto https://prestodb.io/docs/current/functions/array.html - Verifying this change This change added tests in CollectionFunctionsITCase. - Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): ( no) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes ) The serializers: (no) The runtime per-record code paths (performance sensitive): ( no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) The S3 file system connector: ( no) - Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32056) Update the used Pulsar connector in flink-python to 4.0.0
[ https://issues.apache.org/jira/browse/FLINK-32056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-32056. --- Fix Version/s: 1.18.0 1.17.2 Resolution: Fixed Fixed in: - master via fbf7b91424ec626ae56dd2477347a7759db6d5fe - release-1.17 via d3a3755a7eef5708871580671169fd6bd2babf28 > Update the used Pulsar connector in flink-python to 4.0.0 > - > > Key: FLINK-32056 > URL: https://issues.apache.org/jira/browse/FLINK-32056 > Project: Flink > Issue Type: Bug > Components: API / Python, Connectors / Pulsar >Affects Versions: 1.18.0, 1.17.1 >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > flink-python still references and tests flink-connector-pulsar:3.0.0, while > it should be using flink-connector-pulsar:4.0.0. That's because the newer > version is the only version compatible with Flink 1.17 and it doesn't rely on > flink-shaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #22628: [FLINK-32162] Only log error if an error occurred
flinkbot commented on PR #22628: URL: https://github.com/apache/flink/pull/22628#issuecomment-1559117082 ## CI report: * 437fc7e4ecb949686e6ebd3ce50de7f57594d493 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dmvk commented on a diff in pull request #22628: [FLINK-32162] Only log error if an error occurred
dmvk commented on code in PR #22628: URL: https://github.com/apache/flink/pull/22628#discussion_r1202135895 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1182,7 +1182,10 @@ public CompletableFuture updateJobResourceRequirements( getMainThreadExecutor()) .whenComplete( (ack, error) -> { -log.debug("Failed to update requirements for job {}.", jobId, error); +if (error != null) { +log.debug( Review Comment: should the log level be different as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-31639) Introduce tiered store memory manager
[ https://issues.apache.org/jira/browse/FLINK-31639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song closed FLINK-31639. Fix Version/s: 1.18.0 Resolution: Done master (1.18): c6d7747eaef166fb7577de55cb2943fa5408d54e > Introduce tiered store memory manager > - > > Key: FLINK-31639 > URL: https://issues.apache.org/jira/browse/FLINK-31639 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.18.0 >Reporter: Yuxin Tan >Assignee: Yuxin Tan >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong closed pull request #22352: [FLINK-31639][network] Introduce tiered store memory manager
xintongsong closed pull request #22352: [FLINK-31639][network] Introduce tiered store memory manager URL: https://github.com/apache/flink/pull/22352 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32162) Misleading log message due to missing null check
[ https://issues.apache.org/jira/browse/FLINK-32162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32162: --- Labels: pull-request-available (was: ) > Misleading log message due to missing null check > > > Key: FLINK-32162 > URL: https://issues.apache.org/jira/browse/FLINK-32162 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > Updating the job requirements always logs "Failed to update requirements for > job {}." because we don't check whether the error is not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32162) Misleading log message due to missing null check
Chesnay Schepler created FLINK-32162: Summary: Misleading log message due to missing null check Key: FLINK-32162 URL: https://issues.apache.org/jira/browse/FLINK-32162 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Updating the job requirements always logs "Failed to update requirements for job {}." because we don't check whether the error is not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface
XComp commented on code in PR #22384: URL: https://github.com/apache/flink/pull/22384#discussion_r1202068871 ## flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.UUID; + +/** + * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link + * LeaderElection} handling. + */ +public abstract class AbstractLeaderElectionService implements LeaderElectionService { +@Override +public LeaderElection createLeaderElection() { +return new LeaderElectionImpl(this); +} + +/** + * Registers the given {@link LeaderContender} with the underlying {@code + * LeaderElectionService}. Leadership changes are starting to be reported to the {@code + * LeaderContender}. + */ +protected abstract void register(LeaderContender contender) throws Exception; + +/** Confirms the leadership with the given session ID and address. */ +protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress); + +/** + * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given + * session ID. + * + * @return {@code true} if the service has leadership with the passed session ID acquired; + * {@code false} otherwise. + */ +protected abstract boolean hasLeadership(UUID leaderSessionId); + +/** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */ +private static class LeaderElectionImpl implements LeaderElection { Review Comment: > and you ended up doing exactly what I asked for?... I'm confused by the question mark here. :innocent: I found your proposal reasonable in terms of testability and went ahead with the refactoring. My [previous comment](https://github.com/apache/flink/pull/22384#discussion_r1200710974) just explained my initial reasoning. :-D ## flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.UUID; + +/** + * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link + * LeaderElection} handling. + */ +public abstract class AbstractLeaderElectionService implements LeaderElectionService { +@Override +public LeaderElection createLeaderElection() { +return new LeaderElectionImpl(this); +} + +/** + * Registers the given {@link LeaderContender} with the underlying {@code + * LeaderElectionService}. Leadership changes are starting to be reported to the {@code + * LeaderContender}. + */ +protected abstract void register(LeaderContender contender) throws Exception; + +/** Confirms the leadership with the given session ID and address. */ +protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress); + +/** + * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given +
[jira] [Created] (FLINK-32161) Migrate and remove some legacy ExternalResource
Weijie Guo created FLINK-32161: -- Summary: Migrate and remove some legacy ExternalResource Key: FLINK-32161 URL: https://issues.apache.org/jira/browse/FLINK-32161 Project: Flink Issue Type: Technical Debt Reporter: Weijie Guo Assignee: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32161) Migrate and remove some legacy ExternalResource
[ https://issues.apache.org/jira/browse/FLINK-32161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weijie Guo updated FLINK-32161: --- Affects Version/s: 1.18.0 > Migrate and remove some legacy ExternalResource > --- > > Key: FLINK-32161 > URL: https://issues.apache.org/jira/browse/FLINK-32161 > Project: Flink > Issue Type: Technical Debt >Affects Versions: 1.18.0 >Reporter: Weijie Guo >Assignee: Weijie Guo >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725336#comment-17725336 ] Roman Khachatryan commented on FLINK-29913: --- Thanks for the proposal [~Feifan Wang]. I think it's easier to implement, compared to always unique state IDs. OTH, it complicates already complex part of the system; and has some runtime overhead. So I'd rather go with unique state IDs. WDYT? As for the priority, I'd change it to Major if there are no objections. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException
[ https://issues.apache.org/jira/browse/FLINK-32160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725319#comment-17725319 ] Martijn Visser commented on FLINK-32160: [~lzljs3620320] Do you have any insights on this? > CompactOperator cannot continue from checkpoint because of > java.util.NoSuchElementException > --- > > Key: FLINK-32160 > URL: https://issues.apache.org/jira/browse/FLINK-32160 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.16.0, 1.17.0 > Environment: Flink 1.16/1.17 on k8s (flink-kubernetes-operator > v.1.4.0), s3 >Reporter: Michał Fijołek >Priority: Major > > Hello :) We have a flink job (v 1.17) on k8s (using official > flink-k8s-operator) that reads data from kafka and writes it to s3 using > flink-sql using compaction. Job sometimes fails and continues from checkpoint > just fine, but once a couple of days we experience a crash loop. Job cannot > continue from the latest checkpoint and fails with such exception: > {noformat} > java.util.NoSuchElementException at > java.base/java.util.ArrayList$Itr.next(Unknown Source) > at > org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Unknown Source){noformat} > Here’s the relevant code: > [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114] > It looks like `CompactOperator` is calling `next()` on iterator without > checking `hasNext()` first - why's that? Is it a bug? Why > `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty > iterator? Is latest checkpoint broken in such case? > We have an identical job, but without compaction, and it works smoothly for a > couple of weeks now. > The whole job is just `select` from kafka and `insert` to s3. > {noformat} > CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING, > `foo_bar2` STRING, > `foo_bar3` STRING, > `foo_bar4` STRING > ) > PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING) > STORED AS parquet > LOCATION 's3a://my/bucket/' > TBLPROPERTIES ( > 'auto-compaction' = 'true', > 'compaction.file-size' = '128MB', > 'sink.parallelism' = '8', > 'format' = 'parquet', > 'parquet.compression' = 'SNAPPY', > 'sink.rolling-policy.rollover-interval' = '1 h', > 'sink.partition-commit.policy.kind' = 'metastore' > ){noformat} > Checkpoint configuration: > {noformat} > Checkpointing Mode Exactly Once > Checkpoint Storage FileSystemCheckpointStorage > State Backend HashMapStateBackend > Interval 20m 0s > Timeout 10m 0s > Minimum Pause Between Checkpoints 0ms > Maximum Concurrent Checkpoints 1 > Unaligned Checkpoints Disabled > Persist Checkpoints Externally Enabled (retain on cancellation) > Tolerable Failed Checkpoints 0 > Checkpoints With Finished Tasks Enabled > State Changelog Disabled{noformat} > Is there something wrong with given config or is this some unhandled edge > case? > Currently our workaround is to restart a job, without using checkpoint - it > uses a state from kafka which in this case is fine -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31828) List field in a POJO data stream results in table program compilation failure
[ https://issues.apache.org/jira/browse/FLINK-31828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725311#comment-17725311 ] Aitozi commented on FLINK-31828: Any sql guys can help verify this fix ? > List field in a POJO data stream results in table program compilation failure > - > > Key: FLINK-31828 > URL: https://issues.apache.org/jira/browse/FLINK-31828 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.16.1 > Environment: Java 11 > Flink 1.16.1 >Reporter: Vladimir Matveev >Priority: Major > Labels: pull-request-available > Attachments: MainPojo.java, generated-code.txt, stacktrace.txt > > > Suppose I have a POJO class like this: > {code:java} > public class Example { > private String key; > private List> values; > // getters, setters, equals+hashCode omitted > } > {code} > When a DataStream with this class is converted to a table, and some > operations are performed on it, it results in an exception which explicitly > says that I should file a ticket: > {noformat} > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > {noformat} > Please find the example Java code and the full stack trace attached. > From the exception and generated code it seems that Flink is upset with the > list field being treated as an array - but I cannot have an array type there > in the real code. > Also note that if I _don't_ specify the schema explicitly, it then maps the > {{values}} field to a `RAW('java.util.List', '...')` type, which also does > not work correctly and fails the job in case of even simplest operations like > printing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32043) SqlClient session unrecoverable once one wrong setting occurred
[ https://issues.apache.org/jira/browse/FLINK-32043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-32043: Priority: Critical (was: Major) > SqlClient session unrecoverable once one wrong setting occurred > --- > > Key: FLINK-32043 > URL: https://issues.apache.org/jira/browse/FLINK-32043 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.17.0 >Reporter: lincoln lee >Priority: Critical > > In sql client, it can not work normally once one wrong setting occurred > {code:java} > // wrong setting here > Flink SQL> SET table.sql-dialect = flink; > [INFO] Execute statement succeed. > Flink SQL> select '' AS f1, a from t1; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> SET table.sql-dialect = default; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> RESET table.sql-dialect; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > Flink SQL> RESET; > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: No enum constant > org.apache.flink.table.api.SqlDialect.FLINK > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)