[jira] [Commented] (FLINK-32807) when i use emitUpdateWithRetract of udtagg,bug error
[ https://issues.apache.org/jira/browse/FLINK-32807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784307#comment-17784307 ] yong yang commented on FLINK-32807: --- yes it is duplicated > when i use emitUpdateWithRetract of udtagg,bug error > > > Key: FLINK-32807 > URL: https://issues.apache.org/jira/browse/FLINK-32807 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / Planner >Affects Versions: 1.17.1 > Environment: http://maven.apache.org/POM/4.0.0"; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd";> > 4.0.0 > org.example > FlinkLocalDemo > 1.0-SNAPSHOT > jar > FlinkLocalDemo > http://maven.apache.org > > UTF-8 > 1.17.1 > 2.12 > 2.12.8 > > > > > com.chuusai > shapeless_${scala.binary.version} > 2.3.10 > > > > joda-time > joda-time > 2.12.5 > > > org.apache.flink > flink-avro > ${flink.version} > > > org.apache.flink > flink-runtime-web > ${flink.version} > > > > com.alibaba.fastjson2 > fastjson2 > 2.0.33 > > > > com.alibaba > fastjson > 1.2.83 > > > > junit > junit > 3.8.1 > test > > > > org.apache.flink > flink-table-common > ${flink.version} > > > > org.apache.flink > flink-connector-kafka > ${flink.version} > provided > > > org.apache.flink > flink-json > ${flink.version} > > > org.apache.flink > flink-scala_${scala.binary.version} > ${flink.version} > provided > > > org.apache.flink > flink-streaming-scala_${scala.binary.version} > ${flink.version} > > > org.apache.flink > flink-csv > ${flink.version} > > > > org.apache.flink > flink-table-api-java-bridge > ${flink.version} > > > > > org.apache.flink > flink-table-api-scala-bridge_${scala.binary.version} > ${flink.version} > > > > org.apache.flink > flink-table-planner-loader > ${flink.version} > > > > > org.apache.flink > flink-table-runtime > ${flink.version} > provided > > > > org.apache.flink > flink-connector-files > ${flink.version} > > > > > > > > > > org.apache.flink > flink-clients > ${flink.version} > > > org.apache.flink > flink-connector-jdbc > 3.1.0-1.17 > provided > > > mysql > mysql-connector-java > 8.0.11 > > > > > > > org.apache.maven.plugins > maven-shade-plugin > 2.4.3 > > > package > > shade > > > > > *:* > > META-INF/*.SF > META-INF/*.DSA > META-INF/*.RSA > > > > > > > > > org.scala-tools > maven-scala-plugin > 2.15.2 > > > > compile > testCompile > > > > > > net.alchim31.maven > scala-maven-plugin > 3.2.2 > > > scala-compile-first > process-resources > > add-source > compile > > > > > ${scala.version} > > > > org.apache.maven.plugins > maven-assembly-plugin > 2.5.5 > > > > > > > > > jar-with-dependencies > > > > > org.apache.maven.plugins > maven-compiler-plugin > 3.1 > > 11 > 11 > > > > > >Reporter: yong yang >Priority: Major > Attachments: Top2WithRetract.scala, UdtaggDemo3.scala > > > 参考: > [https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/udfs/#retraction-example] > 我的代码: > [^Top2WithRetract.scala] > > bug show error: > {code:java} > //代码占位符 > /Users/thomas990p/Library/Java/JavaVirtualMachines/corretto-11.0.20/Contents/Home/bin/java > -javaagent:/Users/thomas990p/Library/Application > Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ > IDEA.app/Contents/lib/idea_rt.jar=56941:/Users/thomas990p/Library/Application > Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ > IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath > /Users/thomas990p/IdeaProjects/FlinkLocalDemo/target/classes:/Users/thomas990p/.m2/repository/com/chuusai/shapeless_2.12/2.3.10/shapeless_2.12-2.3.10.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar:/Users/thomas990p/.m2/repository/joda-time/joda-time/2.12.5/joda-time-2.12.5.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-avro/1.17.1/flink-avro-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/avro/avro/1.11.1/avro-1.11.1.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.7/jackson-core-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/Users/thomas990p/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-runtime-web/1.17.1/flink-runtime-web-1.17.1.jar:/Users/thomas990p/.m2/repository/org/
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: What about tasks? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set executionVertexIds; -private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; +private @Nonnull SlotSharingGroup slotSharingGroup; +/** @deprecated Only for test classes. */ +@Deprecated Review Comment: `@VisibleForTesting` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java: ## @@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration( configuration.getBoolean( ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED); -boolean evenlySpreadOutSlots = - configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); +TaskManagerLoadBalanceMode taskManagerLoadBalanceMode = + TaskManagerLoadBalanceMode.loadFromConfiguration(configuration); final SlotMatchingStrategy slotMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: ditto. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { Review Comment: No need to implement them again. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { -private final Map executionSlotSharingGroupMap; - -private final Set logicalSlotSharingGroups; - -private final Set coLocationGroups; +public static final Logger LOG = + LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class); Review Comment: Where do we use it? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // +public SlotSharingGroup() {} + +public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Seems we don't need to do that. Just instantiate a SlotSharingGroup and set the resource profile. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // +public SlotSharingGroup() {} + +public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Only visible for testing? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +34,36 @@ class ExecutionSlotSh
Re: [PR] [FLINK-33023][table-planner][JUnit5 Migration] Module: flink-table-planner (TableTestBase) [flink]
xuyangzhong commented on PR #23349: URL: https://github.com/apache/flink/pull/23349#issuecomment-1803283116 LGTM. cc @leonardBang for the final check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]
Zakelly commented on code in PR #23679: URL: https://github.com/apache/flink/pull/23679#discussion_r1387577165 ## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java: ## @@ -842,6 +844,85 @@ private void disposeSavepoint( logAndSysout("Savepoint '" + savepointPath + "' disposed."); } +/** + * Executes the CHECKPOINT action. + * + * @param args Command line arguments for the checkpoint action. + */ +protected void checkpoint(String[] args) throws Exception { +LOG.info("Running 'checkpoint' command."); + +final Options commandOptions = CliFrontendParser.getCheckpointCommandOptions(); + +final CommandLine commandLine = getCommandLine(commandOptions, args, false); + +final CheckpointOptions checkpointOptions = new CheckpointOptions(commandLine); + +// evaluate help flag +if (checkpointOptions.isPrintHelp()) { +CliFrontendParser.printHelpForCheckpoint(customCommandLines); +return; +} + +final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine); + +String[] cleanedArgs = checkpointOptions.getArgs(); + +final JobID jobId; + +if (cleanedArgs.length >= 1) { +String jobIdString = cleanedArgs[0]; + +jobId = parseJobId(jobIdString); +} else { +throw new CliArgsException( +"Missing JobID. " + "Specify a Job ID to manipulate a checkpoint."); +} +runClusterAction( +activeCommandLine, +commandLine, +(clusterClient, effectiveConfiguration) -> +triggerCheckpoint( +clusterClient, +jobId, +checkpointOptions.getCheckpointType(), +getClientTimeout(effectiveConfiguration))); +} + +/** Sends a CheckpointTriggerMessage to the job manager. */ +private void triggerCheckpoint( +ClusterClient clusterClient, +JobID jobId, +CheckpointType checkpointType, +Duration clientTimeout) +throws FlinkException { +logAndSysout("Triggering checkpoint for job " + jobId + '.'); + +CompletableFuture checkpointFuture = +clusterClient.triggerCheckpoint(jobId, checkpointType); + +logAndSysout("Waiting for response..."); + +try { +final long checkpointId = +checkpointFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); + +logAndSysout( +"Checkpoint(" ++ checkpointType ++ ") " ++ checkpointId ++ " for job " ++ jobId ++ " completed."); +logAndSysout("You can resume your program from this checkpoint with the run command."); +} catch (Exception e) { +Throwable cause = ExceptionUtils.stripExecutionException(e); +throw new FlinkException( +"Triggering a checkpoint for the job " + jobId + " failed.", cause); Review Comment: 👍 I also changed the same message in `savepoint` command. ## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java: ## @@ -842,6 +844,85 @@ private void disposeSavepoint( logAndSysout("Savepoint '" + savepointPath + "' disposed."); } +/** + * Executes the CHECKPOINT action. + * + * @param args Command line arguments for the checkpoint action. + */ +protected void checkpoint(String[] args) throws Exception { +LOG.info("Running 'checkpoint' command."); + +final Options commandOptions = CliFrontendParser.getCheckpointCommandOptions(); + +final CommandLine commandLine = getCommandLine(commandOptions, args, false); + +final CheckpointOptions checkpointOptions = new CheckpointOptions(commandLine); + +// evaluate help flag +if (checkpointOptions.isPrintHelp()) { +CliFrontendParser.printHelpForCheckpoint(customCommandLines); +return; +} + +final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(commandLine); + +String[] cleanedArgs = checkpointOptions.getArgs(); + +final JobID jobId; + +if (cleanedArgs.length >= 1) { +String jobIdString = cleanedArgs[0]; + +jobId = parseJobId(jobIdString); +} else { +throw new CliArgsException( +"Missing JobID. " + "Specify a Job ID to manipulate a checkpoint."); +} +runClusterAction( +activeCommandLine, +commandLine, +(clusterClient, effectiveConfiguration
[jira] [Updated] (FLINK-33490) Validate the name conflicts when creating view
[ https://issues.apache.org/jira/browse/FLINK-33490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-33490: -- Description: We should forbid ``` CREATE VIEW id_view AS SELECT id, uid AS id FROM id_table ``` As the SQL standards states, If is specified, then: i) If any two columns in the table specified by the have equivalent s, or if any column of that table has an implementation-dependent name, then a shall be specified. ii) Equivalent s shall not be specified more than once in the . Many databases also throw exception when view name conflicts, e.g. mysql, postgres. was: When should forbid ``` CREATE VIEW id_view AS SELECT id, uid AS id FROM id_table ``` As the SQL standards states, If is specified, then: i) If any two columns in the table specified by the have equivalent s, or if any column of that table has an implementation-dependent name, then a shall be specified. ii) Equivalent s shall not be specified more than once in the . Many databases also throw exception when view name conflicts, e.g. mysql, postgres. > Validate the name conflicts when creating view > -- > > Key: FLINK-33490 > URL: https://issues.apache.org/jira/browse/FLINK-33490 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Shengkai Fang >Priority: Major > > We should forbid > ``` > CREATE VIEW id_view AS > SELECT id, uid AS id FROM id_table > ``` > As the SQL standards states, > If is specified, then: > i) If any two columns in the table specified by the have > equivalent s, or if any column of that table has an > implementation-dependent name, then a shall be specified. > ii) Equivalent s shall not be specified more than once in the > . > Many databases also throw exception when view name conflicts, e.g. mysql, > postgres. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33490) Validate the name conflicts when creating view
Shengkai Fang created FLINK-33490: - Summary: Validate the name conflicts when creating view Key: FLINK-33490 URL: https://issues.apache.org/jira/browse/FLINK-33490 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: Shengkai Fang When should forbid ``` CREATE VIEW id_view AS SELECT id, uid AS id FROM id_table ``` As the SQL standards states, If is specified, then: i) If any two columns in the table specified by the have equivalent s, or if any column of that table has an implementation-dependent name, then a shall be specified. ii) Equivalent s shall not be specified more than once in the . Many databases also throw exception when view name conflicts, e.g. mysql, postgres. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]
Zakelly commented on code in PR #23679: URL: https://github.com/apache/flink/pull/23679#discussion_r1387573143 ## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java: ## @@ -611,6 +636,22 @@ public static void printHelpForSavepoint(Collection customCom System.out.println(); } +public static void printHelpForCheckpoint(Collection customCommandLines) { +HelpFormatter formatter = new HelpFormatter(); +formatter.setLeftPadding(5); +formatter.setWidth(80); + +System.out.println( +"\nAction \"checkpoint\" triggers checkpoints for a running job or disposes existing ones."); Review Comment: Yeah, sorry for the mistake 😆 ## flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java: ## @@ -611,6 +636,22 @@ public static void printHelpForSavepoint(Collection customCom System.out.println(); } +public static void printHelpForCheckpoint(Collection customCommandLines) { +HelpFormatter formatter = new HelpFormatter(); +formatter.setLeftPadding(5); +formatter.setWidth(80); + +System.out.println( +"\nAction \"checkpoint\" triggers checkpoints for a running job or disposes existing ones."); Review Comment: Yeah, sorry for the mistake 😆 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-6755][CLI] Support manual checkpoints triggering [flink]
Zakelly commented on code in PR #23679: URL: https://github.com/apache/flink/pull/23679#discussion_r1387572052 ## docs/content.zh/docs/deployment/cli.md: ## @@ -153,6 +153,35 @@ $ ./bin/flink savepoint \ Triggering the savepoint disposal through the `savepoint` action does not only remove the data from the storage but makes Flink clean up the savepoint-related metadata as well. +### Creating a Checkpoint +[Checkpoints]({{< ref "docs/ops/state/checkpoints" >}}) can also be manually created to save the +current state. To get the difference between checkpoint and savepoint, please refer to +[Checkpoints vs. Savepoints]({{< ref "docs/ops/state/checkpoints_vs_savepoints" >}}). All that's +needed to trigger a checkpoint manually is the JobID: +```bash +$ ./bin/flink checkpoint \ + $JOB_ID +``` +``` +Triggering checkpoint for job 99c59fead08c613763944f533bf90c0f. +Waiting for response... +Checkpoint(CONFIGURED) 26 for job 99c59fead08c613763944f533bf90c0f completed. +You can resume your program from this checkpoint with the run command. +``` +If you want to trigger a full checkpoint while the job periodically triggering incremental checkpoints, +please use the `--full` option. Review Comment: After some digging, I found when a job is configured to run with rocksdb and incremental checkpoint disabled, the `--incremental` flag could not take effect. That is because the `RocksNativeFullSnapshotStrategy` does not take `CheckpointOptions` into account when taking async snapshots. So since only `--full` flag works when incremental checkpoint enabled, I only describe this usage here. But I'm ok with your phrasing. Also, we should reconsider the incremental file-sharing chain in this senario. How does a manually triggered incremental checkpoint reuse the files from previous full checkpoints, which exist in `chk-x` instead of `shared` directory? I think we need a more flexible checkpoint directory layout. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32807) when i use emitUpdateWithRetract of udtagg,bug error
[ https://issues.apache.org/jira/browse/FLINK-32807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784297#comment-17784297 ] xuyang commented on FLINK-32807: Hi, [~luca.yang] . I think this issue is duplicated with https://issues.apache.org/jira/browse/FLINK-31788 ? > when i use emitUpdateWithRetract of udtagg,bug error > > > Key: FLINK-32807 > URL: https://issues.apache.org/jira/browse/FLINK-32807 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / Planner >Affects Versions: 1.17.1 > Environment: http://maven.apache.org/POM/4.0.0"; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd";> > 4.0.0 > org.example > FlinkLocalDemo > 1.0-SNAPSHOT > jar > FlinkLocalDemo > http://maven.apache.org > > UTF-8 > 1.17.1 > 2.12 > 2.12.8 > > > > > com.chuusai > shapeless_${scala.binary.version} > 2.3.10 > > > > joda-time > joda-time > 2.12.5 > > > org.apache.flink > flink-avro > ${flink.version} > > > org.apache.flink > flink-runtime-web > ${flink.version} > > > > com.alibaba.fastjson2 > fastjson2 > 2.0.33 > > > > com.alibaba > fastjson > 1.2.83 > > > > junit > junit > 3.8.1 > test > > > > org.apache.flink > flink-table-common > ${flink.version} > > > > org.apache.flink > flink-connector-kafka > ${flink.version} > provided > > > org.apache.flink > flink-json > ${flink.version} > > > org.apache.flink > flink-scala_${scala.binary.version} > ${flink.version} > provided > > > org.apache.flink > flink-streaming-scala_${scala.binary.version} > ${flink.version} > > > org.apache.flink > flink-csv > ${flink.version} > > > > org.apache.flink > flink-table-api-java-bridge > ${flink.version} > > > > > org.apache.flink > flink-table-api-scala-bridge_${scala.binary.version} > ${flink.version} > > > > org.apache.flink > flink-table-planner-loader > ${flink.version} > > > > > org.apache.flink > flink-table-runtime > ${flink.version} > provided > > > > org.apache.flink > flink-connector-files > ${flink.version} > > > > > > > > > > org.apache.flink > flink-clients > ${flink.version} > > > org.apache.flink > flink-connector-jdbc > 3.1.0-1.17 > provided > > > mysql > mysql-connector-java > 8.0.11 > > > > > > > org.apache.maven.plugins > maven-shade-plugin > 2.4.3 > > > package > > shade > > > > > *:* > > META-INF/*.SF > META-INF/*.DSA > META-INF/*.RSA > > > > > > > > > org.scala-tools > maven-scala-plugin > 2.15.2 > > > > compile > testCompile > > > > > > net.alchim31.maven > scala-maven-plugin > 3.2.2 > > > scala-compile-first > process-resources > > add-source > compile > > > > > ${scala.version} > > > > org.apache.maven.plugins > maven-assembly-plugin > 2.5.5 > > > > > > > > > jar-with-dependencies > > > > > org.apache.maven.plugins > maven-compiler-plugin > 3.1 > > 11 > 11 > > > > > >Reporter: yong yang >Priority: Major > Attachments: Top2WithRetract.scala, UdtaggDemo3.scala > > > 参考: > [https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/functions/udfs/#retraction-example] > 我的代码: > [^Top2WithRetract.scala] > > bug show error: > {code:java} > //代码占位符 > /Users/thomas990p/Library/Java/JavaVirtualMachines/corretto-11.0.20/Contents/Home/bin/java > -javaagent:/Users/thomas990p/Library/Application > Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ > IDEA.app/Contents/lib/idea_rt.jar=56941:/Users/thomas990p/Library/Application > Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/231.9161.38/IntelliJ > IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath > /Users/thomas990p/IdeaProjects/FlinkLocalDemo/target/classes:/Users/thomas990p/.m2/repository/com/chuusai/shapeless_2.12/2.3.10/shapeless_2.12-2.3.10.jar:/Users/thomas990p/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar:/Users/thomas990p/.m2/repository/joda-time/joda-time/2.12.5/joda-time-2.12.5.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-avro/1.17.1/flink-avro-1.17.1.jar:/Users/thomas990p/.m2/repository/org/apache/avro/avro/1.11.1/avro-1.11.1.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.12.7/jackson-core-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.12.7/jackson-databind-2.12.7.jar:/Users/thomas990p/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.12.7/jackson-annotations-2.12.7.jar:/Users/thomas990p/.m2/repository/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar:/Users/thomas990p/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/thomas990p/.m2/repository/org/apache/flink/flink-ru
Re: [PR] [FLINK-33358][sql] Fix Flink SQL Client fail to start in Flink on YARN [flink]
PrabhuJoseph commented on PR #23629: URL: https://github.com/apache/flink/pull/23629#issuecomment-1803208444 @fsk119 Please review this patch when you have time. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33262) Extend source provider interfaces with the new parallelism provider interface
[ https://issues.apache.org/jira/browse/FLINK-33262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benchao Li resolved FLINK-33262. Fix Version/s: 1.19.0 Resolution: Implemented merged via 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202 (1.19.0) > Extend source provider interfaces with the new parallelism provider interface > - > > Key: FLINK-33262 > URL: https://issues.apache.org/jira/browse/FLINK-33262 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface [flink]
libenchao closed pull request #23663: [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface URL: https://github.com/apache/flink/pull/23663 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30613] Improve resolving schema compatibility -- Milestone one [flink]
masteryhx commented on PR #21635: URL: https://github.com/apache/flink/pull/21635#issuecomment-1803157270 rebased. @Zakelly Could you also help to take a look ? Thanks a lot! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20672) notifyCheckpointAborted RPC failure can fail JM
[ https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784274#comment-17784274 ] Zakelly Lan commented on FLINK-20672: - [~yunta] A fatal exit for uncaught exception is a relatively "safe" option, since the executor service may not know what to do when encountering errors. If we could strictly limit its use and stipulate that it should not affect the job junning, we could use another handler without failing the process. After reading some code, IIUC, the {{[DefaultJobMasterServiceFactory|https://github.com/apache/flink/blob/eb4ae5d4e7d517300e98e632de95249dbdd22192/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java#L101C1-L101C1]}} is using the io-executor to {{[createJobMasterService|https://github.com/apache/flink/blob/eb4ae5d4e7d517300e98e632de95249dbdd22192/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java#L101]}}, which is essential for job running. And it leave all exceptions uncaught, which should be also changed if we decide to change the behavior of io executor. Actually I have no preference changing this behavior or not, since maybe some "io" operations are fatal and most are not. This is a matter of regulations and contracts. I suggest we could do our best to catch the exception within a runnable task if we are sure this one should not have any side effects on the job. WDYT? > notifyCheckpointAborted RPC failure can fail JM > --- > > Key: FLINK-20672 > URL: https://issues.apache.org/jira/browse/FLINK-20672 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.3, 1.12.0 >Reporter: Roman Khachatryan >Assignee: Zakelly Lan >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > Introduced in FLINK-8871, aborted RPC notifications are done asynchonously: > > {code} > private void sendAbortedMessages(long checkpointId, long timeStamp) { > // send notification of aborted checkpoints asynchronously. > executor.execute(() -> { > // send the "abort checkpoint" messages to necessary > vertices. > // .. > }); > } > {code} > However, the executor that eventually executes this request is created as > follows > {code} > final ScheduledExecutorService futureExecutor = > Executors.newScheduledThreadPool( > Hardware.getNumberCPUCores(), > new ExecutorThreadFactory("jobmanager-future")); > {code} > ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error. > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33489][table-planner] forbid generating partial-final agg with LISTAGG to avoid wrong result [flink]
flinkbot commented on PR #23688: URL: https://github.com/apache/flink/pull/23688#issuecomment-1803149601 ## CI report: * 36da81aa605a7d8f29f80ceac070b7f673ccead6 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result
[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33489: --- Labels: pull-request-available (was: ) > LISTAGG with generating partial-final agg will cause wrong result > - > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33489][table-planner] forbid generating partial-final agg with LISTAGG to avoid wrong result [flink]
xuyangzhong opened a new pull request, #23688: URL: https://github.com/apache/flink/pull/23688 ## What is the purpose of the change Currently LISTAGG with splitting original agg to final-partial agg will cause wrong result. This pr is for a quick fix to avoid splitting with it. ## Brief change log - *Avoid splitting LISTAGG when SplitAggregateRule* - *Add UT and IT* ## Verifying this change Some tests are added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result
[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang reassigned FLINK-33489: - Assignee: xuyang > LISTAGG with generating partial-final agg will cause wrong result > - > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33023][table-planner][JUnit5 Migration] Module: flink-table-planner (TableTestBase) [flink]
Jiabao-Sun commented on PR #23349: URL: https://github.com/apache/flink/pull/23349#issuecomment-1803143808 > Thank for your huge contribution about replacing junit4 with junit5 in table-planner module. The whole pr looks good to me. I mainly have two confusion. > > 1. Is it a good time to remove the dependency in module `table-planner` to avoid other contributor continuing using junit4? > 2. I found that you spent some time to remove `public` for each test functions. But it is a weak constraint for contributor to following this implicit rule. How to make this rule more visible for contributor? (maybe using custom check style or something else) Thanks @xuyangzhong for the hard review. 1. Currently we can't remove junit4 dependencies from table-planner, because there are still some tests using Junit4 and have not been migrated to complete. After all the tests are migrated by Junit5, we can do this. 2. I think we can add this check to ARCH Tests, but it may still need to wait until Junit5 is fully migrated. Please help check it again when you have time. Thanks again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result
[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-33489: --- Summary: LISTAGG with generating partial-final agg will cause wrong result (was: LISTAGG with generating partial-final agg will case wrong result) > LISTAGG with generating partial-final agg will cause wrong result > - > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 >Reporter: xuyang >Priority: Major > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33489) LISTAGG with generating partial-final agg will case wrong result
[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784273#comment-17784273 ] xuyang commented on FLINK-33489: For a quick fix, we can forbid generating final-partial agg with this function. For a long term, we can only use distinct as the partial agg and then use the real agg function as the final agg. But first we must check out other agg functions. I'll try to fix it. > LISTAGG with generating partial-final agg will case wrong result > > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 >Reporter: xuyang >Priority: Major > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33489) LISTAGG with generating partial-final agg will case wrong result
xuyang created FLINK-33489: -- Summary: LISTAGG with generating partial-final agg will case wrong result Key: FLINK-33489 URL: https://issues.apache.org/jira/browse/FLINK-33489 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0, 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.9.0 Reporter: xuyang Adding the following test cases in SplitAggregateITCase will reproduce this bug: {code:java} // code placeholder @Test def testListAggWithDistinctMultiArgs(): Unit = { val t1 = tEnv.sqlQuery(s""" |SELECT | a, | LISTAGG(DISTINCT c, '#') |FROM T |GROUP BY a """.stripMargin) val sink = new TestingRetractSink t1.toRetractStream[Row].addSink(sink) env.execute() val expected = Map[String, List[String]]( "1" -> List("Hello 0", "Hello 1"), "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), "3" -> List("Hello 0", "Hello 1"), "4" -> List("Hello 1", "Hello 2", "Hello 3") ) val actualData = sink.getRetractResults.sorted println(actualData) } {code} The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter `#` will be ignored. Let's take its plan: {code:java} // code placeholder LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, LISTAGG_RETRACT($f3_0) AS $f1]) +- Exchange(distribution=[hash[a]]) +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) +- Exchange(distribution=[hash[a, $f3, $f4]]) +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) +- DataStreamScan(table=[[default_catalog, default_database, T]], fields=[a, b, c]) {code} The final `GroupAggregate` missing the delimiter args, and the default delimiter `,` will be used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-19931] Do not emit intermediate results for reduce operation BATCH execution mode [flink]
WencongLiu commented on code in PR #13890: URL: https://github.com/apache/flink/pull/13890#discussion_r1387456047 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BatchGroupedReduceOperator.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A {@link StreamOperator} for executing a {@link ReduceFunction} on a + * {@link org.apache.flink.streaming.api.datastream.KeyedStream} in a + * {@link RuntimeExecutionMode#BATCH} mode. + */ +@Internal +public class BatchGroupedReduceOperator + extends AbstractUdfStreamOperator> + implements OneInputStreamOperator, Triggerable { + + private static final long serialVersionUID = 1L; + + private static final String STATE_NAME = "_op_state"; + + private transient ValueState values; + + private final TypeSerializer serializer; + + private InternalTimerService timerService; + + public BatchGroupedReduceOperator(ReduceFunction reducer, TypeSerializer serializer) { + super(reducer); + this.serializer = serializer; + } + + @Override + public void open() throws Exception { + super.open(); + ValueStateDescriptor stateId = new ValueStateDescriptor<>(STATE_NAME, serializer); + values = getPartitionedState(stateId); Review Comment: Thanks for the great work on this. @dawidwys I have a question about the implementation of `BatchGroupedReduceOperator`. Why we store the last reduced record by the state from`getPartitionedState(stateId)`? Could we just use a variable in a class? In the situation of batch processing, `BatchExecutionKeyValueState` could provide the ability to get value by namespace. But this functionality is not needed by Reduce. WDYT? 🤔️ ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BatchGroupedReduceOperator.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A {@link StreamOperator} for executing a {@link ReduceFunction} on a + * {@link org.apache.flink.streaming.api.datastream.KeyedStream} in a + * {@link RuntimeExecutionMode#BATCH} mode. + */ +@Internal +public class Ba
Re: [PR] [FLINK-26694][table] Support lookup join via a multi-level inheritance of TableFunction [flink]
YesOrNo828 commented on PR #23684: URL: https://github.com/apache/flink/pull/23684#issuecomment-1803119191 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20539][table-planner] fix type mismatch when using ROW in computed column [flink]
xuyangzhong commented on PR #23519: URL: https://github.com/apache/flink/pull/23519#issuecomment-1803115628 @snuyanzin Hi, could you do the final check about this pr if you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]
jnh5y commented on code in PR #23686: URL: https://github.com/apache/flink/pull/23686#discussion_r1387440826 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationTestPrograms.java: ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecDeduplicate}. */ +public class DeduplicationTestPrograms { + +static final Row[] DATA1 = { +Row.of(1L, "terry", "pen", 1000L), +Row.of(2L, "alice", "pen", 2000L), +Row.of(3L, "bob", "pen", 3000L), +Row.of(4L, "bob", "apple", 4000L), +Row.of(5L, "fish", "apple", 5000L) +}; + +static final Row[] DATA2 = { +Row.of(6L, "jerry", "pen", 6000L), +Row.of(7L, "larry", "apple", 7000L), +Row.of(8L, "bill", "banana", 8000L), +Row.of(9L, "carol", "apple", 9000L) +}; + +static final TableTestProgram DEDUPLICATE = +TableTestProgram.of("deduplicate-asc", "validates deduplication in ascending") +.setupTableSource( +SourceTestStep.newBuilder("MyTable") +.addSchema( +"order_id bigint", +"`user` varchar", +"product varchar", +"order_time bigint ", +"event_time as TO_TIMESTAMP(FROM_UNIXTIME(order_time)) ", +"watermark for event_time as event_time - INTERVAL '5' second ") +.producedBeforeRestore(DATA1) +.producedAfterRestore(DATA2) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema( +"order_id bigint", +"`user` varchar", +"product varchar", +"order_time bigint", +"primary key(product) not enforced") +.consumedBeforeRestore( +Row.of(1, "terry", "pen", 1000), +Row.of(4, "bob", "apple", 4000)) +.consumedAfterRestore(Row.of(8L, "bill", "banana", 8000L)) +.build()) +.runSql( +"insert into MySink " ++ "select order_id, user, product, order_time \n" ++ "FROM (" ++ " SELECT *," ++ "ROW_NUMBER() OVER (PARTITION BY product ORDER BY event_time ASC) AS row_num\n" ++ " FROM MyTable)" ++ "WHERE row_num = 1") +.build(); + +static final TableTestProgram DEDUPLICATE_DESC = Review Comment: I'm not completely sold on this test. I added it to see an example with `Deduplicate(keep=[LastRow]...`. If we want to remove it, I'm fine with that. If the example needs an addition to make it make more sense, I'm open to suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact I
Re: [PR] [FLINK-33398][runtime] Support switching from batch to stream mode for one input stream operator [flink]
yunfengzhou-hub commented on code in PR #23521: URL: https://github.com/apache/flink/pull/23521#discussion_r1387397501 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordAttributesValve.java: ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * RecordAttributesValve combine RecordAttributes from different input channels. If any of the input + * channels is non backlog, the combined RecordAttributes is non backlog. + * + * Currently, we only support switching the backlog status from null to backlog and backlog to + * non-backlog. Switching from non-backlog to backlog is not support at the moment, and it will be + * ignored. Review Comment: It might be better to throw exception instead of ignoring illegal switches. ## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java: ## @@ -629,6 +641,10 @@ private void sendCachedSplitsToNewlyRegisteredReader(int subtaskIndex, int attem } } +public Boolean isBacklog() { Review Comment: It might be better to improve the naming of this method and add JavaDoc explaining when a `null` would be returned. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.PriorityQueueSetFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.WrappingRuntimeException; + +import java.util.HashMap; +import java.util.Map; + +/** + * InternalBacklogAwareTimerServiceManagerImpl keeps track of all the {@link + * InternalBacklogAwareTimerServiceImpl}. + */ +@Internal +public class InternalBacklogAwareTimerServiceManagerImpl +extends InternalTimeServiceManagerImpl +implements InternalTimeServiceManager, KeyedStateBackend.KeySelectionListener { + +private final Map> timerServices = +new HashMap<>(); + +private boolean backlog = false; + +InternalBacklogAwareTimerServiceManagerImpl( +KeyGroupRange localKeyGroupRange, +KeyContext keyContext, +PriorityQueueSetFactory priorityQueueSetFactory, +ProcessingTimeService processingTimeService,
[jira] [Resolved] (FLINK-26585) State Processor API: Loading a state set buffers the whole state set in memory before starting to process
[ https://issues.apache.org/jira/browse/FLINK-26585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-26585. -- Fix Version/s: 1.19.0 Resolution: Fixed merged eb4ae5d4 into master > State Processor API: Loading a state set buffers the whole state set in > memory before starting to process > - > > Key: FLINK-26585 > URL: https://issues.apache.org/jira/browse/FLINK-26585 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.13.0, 1.14.0, 1.15.0 >Reporter: Matthias Schwalbe >Assignee: Matthias Schwalbe >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: MultiStateKeyIteratorNoStreams.java > > > * When loading a state, MultiStateKeyIterator load and bufferes the whole > state in memory before it event processes a single data point > ** This is absolutely no problem for small state (hence the unit tests work > fine) > ** MultiStateKeyIterator ctor sets up a java Stream that iterates all state > descriptors and flattens all datapoints contained within > ** The java.util.stream.Stream#flatMap function causes the buffering of the > whole data set when enumerated later on > ** See call stack [1] > *** I our case this is 150e6 data points (> 1GiB just for the pointers to > the data, let alone the data itself ~30GiB) > ** I’m not aware of some instrumentation of Stream in order to avoid the > problem, hence > ** I coded an alternative implementation of MultiStateKeyIterator that > avoids using java Stream, > ** I can contribute our implementation (MultiStateKeyIteratorNoStreams) > [1] > Streams call stack: > hasNext:77, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > next:82, RocksStateKeysIterator > (org.apache.flink.contrib.streaming.state.iterator) > forEachRemaining:116, Iterator (java.util) > forEachRemaining:1801, Spliterators$IteratorSpliterator (java.util) > forEach:580, ReferencePipeline$Head (java.util.stream) > accept:270, ReferencePipeline$7$1 (java.util.stream) > # Stream flatMap(final Function Stream> var1) > accept:373, ReferencePipeline$11$1 (java.util.stream) > # Stream peek(final Consumer var1) > accept:193, ReferencePipeline$3$1 (java.util.stream) > # Stream map(final Function > var1) > tryAdvance:1359, ArrayList$ArrayListSpliterator (java.util) > lambda$initPartialTraversalState$0:294, > StreamSpliterators$WrappingSpliterator (java.util.stream) > getAsBoolean:-1, 1528195520 > (java.util.stream.StreamSpliterators$WrappingSpliterator$$Lambda$57) > fillBuffer:206, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > doAdvance:161, StreamSpliterators$AbstractWrappingSpliterator > (java.util.stream) > tryAdvance:300, StreamSpliterators$WrappingSpliterator (java.util.stream) > hasNext:681, Spliterators$1Adapter (java.util) > hasNext:83, MultiStateKeyIterator (org.apache.flink.state.api.input) > hasNext:162, KeyedStateReaderOperator$NamespaceDecorator > (org.apache.flink.state.api.input.operator) > reachedEnd:215, KeyedStateInputFormat (org.apache.flink.state.api.input) > invoke:191, DataSourceTask (org.apache.flink.runtime.operators) > doRun:776, Task (org.apache.flink.runtime.taskmanager) > run:563, Task (org.apache.flink.runtime.taskmanager) > run:748, Thread (java.lang) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-26585][state-processor-api] replace implementation of MultiStateKeyIterator with Stream-free implementation [flink]
masteryhx closed pull request #23239: [FLINK-26585][state-processor-api] replace implementation of MultiStateKeyIterator with Stream-free implementation URL: https://github.com/apache/flink/pull/23239 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fixup! [FLINK-33060][state] Fix the javadoc of ListState interfaces about not allowing null value [flink]
masteryhx closed pull request #23683: fixup! [FLINK-33060][state] Fix the javadoc of ListState interfaces about not allowing null value URL: https://github.com/apache/flink/pull/23683 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka
[ https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783891#comment-17783891 ] xiang1 yu edited comment on FLINK-25538 at 11/9/23 2:09 AM: Hi [~mapohl], I would be interested to work on this one. Could you please assign it to me? I just submit a PR of this issue. [https://github.com/apache/flink-connector-kafka/pull/66] was (Author: JIRAUSER302279): Hi [~mapohl], I would be interested to work on this one. Could you please assign it to me? > [JUnit5 Migration] Module: flink-connector-kafka > > > Key: FLINK-25538 > URL: https://issues.apache.org/jira/browse/FLINK-25538 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Assignee: Ashmeet Kandhari >Priority: Minor > Labels: pull-request-available, stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka
[ https://issues.apache.org/jira/browse/FLINK-25538 ] xiang1 yu deleted comment on FLINK-25538: --- was (Author: JIRAUSER302279): I just submit a PR of this issue.If you have time, could you please help review it? https://github.com/apache/flink-connector-kafka/pull/66 > [JUnit5 Migration] Module: flink-connector-kafka > > > Key: FLINK-25538 > URL: https://issues.apache.org/jira/browse/FLINK-25538 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Assignee: Ashmeet Kandhari >Priority: Minor > Labels: pull-request-available, stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-27286] Add infra to support training high dimension models [flink-ml]
zhipeng93 closed pull request #251: [FLINK-27286] Add infra to support training high dimension models URL: https://github.com/apache/flink-ml/pull/251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25538) [JUnit5 Migration] Module: flink-connector-kafka
[ https://issues.apache.org/jira/browse/FLINK-25538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784245#comment-17784245 ] xiang1 yu commented on FLINK-25538: --- I just submit a PR of this issue.If you have time, could you please help review it? https://github.com/apache/flink-connector-kafka/pull/66 > [JUnit5 Migration] Module: flink-connector-kafka > > > Key: FLINK-25538 > URL: https://issues.apache.org/jira/browse/FLINK-25538 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Assignee: Ashmeet Kandhari >Priority: Minor > Labels: pull-request-available, stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink-connector-kafka]
victor09091 opened a new pull request, #66: URL: https://github.com/apache/flink-connector-kafka/pull/66 ## What is the purpose of the change * [JUnit5 Migration] Module: flink-connector-kafka. * ## Brief change log - *Updated simple junit 4 test packages to junit 5 test packages* ## Verifying this change - *This change is a trivial rework / code cleanup without any test coverage.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink-connector-kafka]
boring-cyborg[bot] commented on PR #66: URL: https://github.com/apache/flink-connector-kafka/pull/66#issuecomment-1803027215 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]
varun1729DD commented on PR #23687: URL: https://github.com/apache/flink/pull/23687#issuecomment-1802979312 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784240#comment-17784240 ] Varun Narayanan Chakravarthy commented on FLINK-33402: -- Created a PR: https://github.com/apache/flink/pull/23687 > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:2
Re: [PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]
flinkbot commented on PR #23687: URL: https://github.com/apache/flink/pull/23687#issuecomment-1802964699 ## CI report: * 49b72ea82160e6c20daf10c4391ace307349317c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33402: --- Labels: pull-request-available (was: ) > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Critical > Labels: pull-request-available > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticF
[PR] [FLINK-33402] Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss [flink]
varun1729DD opened a new pull request, #23687: URL: https://github.com/apache/flink/pull/23687 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: No - The serializers: No - The runtime per-record code paths (performance sensitive): No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: No ## Documentation - Does this pull request introduce a new feature? No - If yes, how is the feature documented? Not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation
[ https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33132: --- Labels: in-progress pull-request-available (was: in-progress) > Flink Connector Redshift Sink Implementation > - > > Key: FLINK-33132 > URL: https://issues.apache.org/jira/browse/FLINK-33132 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: in-progress, pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP][FLINK-33132] Flink Connector Redshift Sink Implementation [flink-connector-aws]
Samrat002 opened a new pull request, #114: URL: https://github.com/apache/flink-connector-aws/pull/114 ## Purpose of the change Flink Connector Redshift Sink Implementation ## Verifying this change - To Do ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [x] Dependencies have been added or upgraded - [x] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [x] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation
[ https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-33132: --- Labels: (was: pull-request-available) > Flink Connector Redshift Sink Implementation > - > > Key: FLINK-33132 > URL: https://issues.apache.org/jira/browse/FLINK-33132 > Project: Flink > Issue Type: Sub-task >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation
[ https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-33132: --- Labels: in-progress (was: ) > Flink Connector Redshift Sink Implementation > - > > Key: FLINK-33132 > URL: https://issues.apache.org/jira/browse/FLINK-33132 > Project: Flink > Issue Type: Sub-task >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: in-progress > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33132) Flink Connector Redshift Sink Implementation
[ https://issues.apache.org/jira/browse/FLINK-33132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-33132: --- Component/s: Connectors / AWS > Flink Connector Redshift Sink Implementation > - > > Key: FLINK-33132 > URL: https://issues.apache.org/jira/browse/FLINK-33132 > Project: Flink > Issue Type: Sub-task > Components: Connectors / AWS >Reporter: Samrat Deb >Assignee: Samrat Deb >Priority: Major > Labels: in-progress > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]
flinkbot commented on PR #23686: URL: https://github.com/apache/flink/pull/23686#issuecomment-1802808057 ## CI report: * 9a3424757010142c5a5cea666861b31277a06f41 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33488) Implement restore tests for Deduplicate node
[ https://issues.apache.org/jira/browse/FLINK-33488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33488: --- Labels: pull-request-available (was: ) > Implement restore tests for Deduplicate node > > > Key: FLINK-33488 > URL: https://issues.apache.org/jira/browse/FLINK-33488 > Project: Flink > Issue Type: Sub-task >Reporter: Jim Hughes >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33488] Implement restore tests for Deduplicate node [flink]
jnh5y opened a new pull request, #23686: URL: https://github.com/apache/flink/pull/23686 ## What is the purpose of the change Implement restore tests for Deduplicate node ## Verifying this change This change added tests and can be verified as follows: * Added restore tests for Deduplicate node which verifies the generated compiled plan with the saved compiled plan ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33402) Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in Data Loss
[ https://issues.apache.org/jira/browse/FLINK-33402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Varun Narayanan Chakravarthy updated FLINK-33402: - Priority: Critical (was: Blocker) > Hybrid Source Concurrency Race Condition Fixes and Related Bugs Results in > Data Loss > > > Key: FLINK-33402 > URL: https://issues.apache.org/jira/browse/FLINK-33402 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.1 > Environment: Apache Flink 1.16.1 > Mac OSX, Linux etc. >Reporter: Varun Narayanan Chakravarthy >Priority: Critical > Attachments: hybridSourceEnumeratorAndReaderFixes.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > Hello Team, > I noticed that there is data loss when using Hybrid Source. We are reading > from a series of concrete File Sources ~100. All these locations are chained > together using the Hybrid source. > The issue stems from a race-condition in Flink Hybrid Source code. The Hybrid > Sources switches the next source before the current source is complete. > Similarly for the Hybrid Source readers. I have also shared the patch file > that fixes the issue. > From the logs: > *Task Manager logs:* > 2023-10-10 17:46:23.577 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Adding > split(s) to reader: [FileSourceSplit: s3://REDACTED/part-1-13189.snappy [0, > 94451) hosts=[localhost] ID=000229 position=null] 2023-10-10 > 17:46:23.715 [Source Data Fetcher for Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek > policy 2023-10-10 17:46:23.715 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - > Switching to Random IO seek policy 2023-10-10 17:46:24.012 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Finished > reading split(s) [000154] 2023-10-10 17:46:24.012 [Source Data Fetcher > for Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished > reading from splits [000154] 2023-10-10 17:46:24.014 [Source: > parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Reader > received NoMoreSplits event. 2023-10-10 17:46:24.014 [Source: parquet-source > (1/2)#0|#0] DEBUG o.a.flink.connector.base.source.hybrid.HybridSourceReader > - No more splits for subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > 2023-10-10 17:46:24.116 [Source Data Fetcher for Source: parquet-source > (1/2)#0|#0] INFO org.apache.hadoop.fs.s3a.S3AInputStream - Switching to > Random IO seek policy 2023-10-10 17:46:24.116 [Source Data Fetcher for > Source: parquet-source (1/2)#0|#0] INFO > org.apache.hadoop.fs.s3a.S3AInputStream - Switching to Random IO seek policy > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Switch source > event: subtask=0 sourceIndex=12 > source=org.apache.flink.connector.kafka.source.KafkaSource@7849da7e > 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] INFO > o.apache.flink.connector.base.source.reader.SourceReaderBase - Closing > Source Reader. 2023-10-10 17:46:24.116 [Source: parquet-source (1/2)#0|#0] > INFO o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Shutting > down split fetcher 0 2023-10-10 17:46:24.198 [Source Data Fetcher for Source: > parquet-source (1/2)#0|#0] INFO > o.a.flink.connector.base.source.reader.fetcher.SplitFetcher - Split fetcher > 0 exited. 2023-10-10 17:46:24.198 [Source: parquet-source (1/2)#0|#0] DEBUG > o.a.flink.connector.base.source.hybrid.HybridSourceReader - Reader closed: > subtask=0 sourceIndex=11 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@59620ef8 > We identified that data from `s3://REDACTED/part-1-13189.snappy` is missing. > This is assigned to Reader with ID 000229. Now, we can see from the logs > this split is added after the no-more splits event and is NOT read. > *Job Manager logs:* > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.f.c.file.src.assigners.LocalityAwareSplitAssigner - Assigning remote > split to requesting host '10': Optional[FileSourceSplit: > s3://REDACTED/part-1-13189.snappy [0, 94451) hosts=[localhost] ID=000229 > position=null] > 2023-10-10 17:46:23.576 [SourceCoordinator-Source: parquet-source] INFO > o.a.flink.connector.file.src.impl.StaticFileSplitEnumerator - A
Re: [PR] [Flink 31966] Flink Kubernetes operator lacks TLS support [flink-kubernetes-operator]
tagarr commented on PR #689: URL: https://github.com/apache/flink-kubernetes-operator/pull/689#issuecomment-1802587767 Hi @gaborgsomogyi I didn't appreciate that there could be 100's or more of FlinkDeployments running on a cluster. If that's the case then my solution wouldn't be the best. What if I provide an optional secret mount for the truststore and optional secretKeyRef env var for the store password. Then modify the config for creating the rest client for the operator to point to this store ? Additionally, I only created the OperatorKubernetesClusterDescriptor class so that the call to deployClusterInternal didn't throw an exception as the config for the actual cluster was being used. If instead of doing this I caught the exception and checked that it was a ClusterRetrieveException I would be able to reduce the changes considerably. Do you think this would be acceptable ? For reference the exception thrown by the operator is: ``` [m[33m2023-11-08 15:50:27,797[m [36mo.a.f.k.o.l.AuditUtils[m [32m[INFO ][flink/basic-secure] >>> Event | Warning | CLUSTERDEPLOYMENTEXCEPTION | org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. [m[33m2023-11-08 15:50:27,800[m [36mo.a.f.k.o.r.ReconciliationUtils[m [33m[WARN ][flink/basic-secure] Attempt count: 0, last attempt: false [m[33m2023-11-08 15:50:27,886[m [36mo.a.f.k.o.l.AuditUtils[m [32m[INFO ][flink/basic-secure] >>> Status | Error | UPGRADING | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient.","additionalMetadata":{},"throwableList":[{"type":"java.lang.RuntimeException","message":"org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient.","additionalMetadata":{}},{"type":"org.apache.flink.client.deployment.ClusterRetrieveException","message":"Could not create the RestClusterClient.","additionalMetadata":{}}]} [m[33m2023-11-08 15:50:27,890[m [36mi.j.o.p.e.ReconciliationDispatcher[m [1;31m[ERROR][flink/basic-secure] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-secure', namespace='flink'}, version: 7366433} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$1(KubernetesClusterDescriptor.java:121) at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:217) at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67) at org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:104) at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:189) at org.a
[jira] [Created] (FLINK-33488) Implement restore tests for Deduplicate node
Jim Hughes created FLINK-33488: -- Summary: Implement restore tests for Deduplicate node Key: FLINK-33488 URL: https://issues.apache.org/jira/browse/FLINK-33488 Project: Flink Issue Type: Sub-task Reporter: Jim Hughes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29288) Can't start a job with a jar in the system classpath
[ https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174 ] Trystan edited comment on FLINK-29288 at 11/8/23 7:37 PM: -- Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0, flink version 1.16.1 If I do not include *jarURI* the job immediately goes into {*}Job Status: FINISHED / Lifecycle State: UPGRADING{*}. If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage errors around the kafka source classes (specifically OffsetResetStrategy). Edit: for the case where i'm not including {*}jarURI{*}, I did find an exception in the operator logs. It seems like it may be related to the pipeline options being null? Which I thought should be optional based on the linked PR. {code:java} {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.NullPointerException\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)\n\tat io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: java.lang.NullPointerException\n\tat org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:407)\n\tat org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)\n\tat org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)\n\tat org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(Native","additionalMetadata":{},"throwableList":[{"type":"java.lang.NullPointerException","additionalMetadata":{}}]} {code} was (Author: trystan): Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0 If I do not include *jarURI* the job immediately goes into {*}Job Status: FINISHED / Lifecycle State: UPGRADING{*}. If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage errors around the kafka source classes (specifically OffsetResetStrategy). Edit: for the case where i'm not including {*}jarURI{*}, I did find an exception in the operator logs. It seems like it may be related to the pipeline options being null? Which I thought should be optional based on the linked PR. {code:java} {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.NullPointerException\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.h
[jira] [Comment Edited] (FLINK-29288) Can't start a job with a jar in the system classpath
[ https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174 ] Trystan edited comment on FLINK-29288 at 11/8/23 7:36 PM: -- Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0 If I do not include *jarURI* the job immediately goes into {*}Job Status: FINISHED / Lifecycle State: UPGRADING{*}. If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage errors around the kafka source classes (specifically OffsetResetStrategy). Edit: for the case where i'm not including {*}jarURI{*}, I did find an exception in the operator logs. It seems like it may be related to the pipeline options being null? Which I thought should be optional based on the linked PR. {code:java} {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.lang.NullPointerException","stackTrace":"org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.NullPointerException\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)\n\tat org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)\n\tat io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)\n\tat org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)\n\tat io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)\n\tat io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)\n\tat io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: java.lang.NullPointerException\n\tat org.apache.flink.kubernetes.utils.KubernetesUtils.checkJarFileForApplicationMode(KubernetesUtils.java:407)\n\tat org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:207)\n\tat org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)\n\tat org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(Native","additionalMetadata":{},"throwableList":[{"type":"java.lang.NullPointerException","additionalMetadata":{}}]} {code} was (Author: trystan): Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0 If I do not include *jarURI* the job immediately goes into {*}Job Status: FINISHED / Lifecycle State: UPGRADING{*}. If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage errors around the kafka source classes (specifically OffsetResetStrategy). > Can't start a job with a jar in the system classpath > > > Key: FLINK-29288 > URL: https://issues.apache.org/jira/browse/FLINK-29288 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Yaroslav Tkachenko >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > I'm using the latest (unreleased) version of the Kubernetes operator. > It looks like currently, it's impossible to use it with a job jar file in the > system classpath (/opt/flink/lib). *jarURI* is required and it's always > passed as a *pipeline.jars* parameter to the Flink process. In practice, it > means that the same class is loaded twice: once by the system classloader and > another time by the user classloader. This leads to exceptions like this: > {quote}java.lang.LinkageError: loader constraint violation: when resolving > method 'XXX' the class loader org.apache.flink.util.ChildFirstClassLoader > @47a5b70d of the current class, YYY, and the class loader 'app' for the > method's defining class, ZZZ, have different Class objects for the type AAA > used in the signature > {quote} > In my opinion, jarURI m
[jira] [Created] (FLINK-33487) Add the new Snowflake connector to supported list
Mohsen Rezaei created FLINK-33487: - Summary: Add the new Snowflake connector to supported list Key: FLINK-33487 URL: https://issues.apache.org/jira/browse/FLINK-33487 Project: Flink Issue Type: New Feature Components: Documentation Affects Versions: 1.17.1, 1.18.0 Reporter: Mohsen Rezaei Fix For: 1.17.2, 1.18.1 Code was contributed in FLINK-32737. Add this new connector to the list of supported ones in the documentation with a corresponding sub-page with the details of the sink: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33487) Add the new Snowflake connector to supported list
[ https://issues.apache.org/jira/browse/FLINK-33487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohsen Rezaei updated FLINK-33487: -- Description: Code was contributed in FLINK-32737. Add this new connector to the list of supported ones in the documentation with a corresponding sub-page for the details of the connector: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/ was: Code was contributed in FLINK-32737. Add this new connector to the list of supported ones in the documentation with a corresponding sub-page with the details of the sink: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/ > Add the new Snowflake connector to supported list > - > > Key: FLINK-33487 > URL: https://issues.apache.org/jira/browse/FLINK-33487 > Project: Flink > Issue Type: New Feature > Components: Documentation >Affects Versions: 1.18.0, 1.17.1 >Reporter: Mohsen Rezaei >Priority: Major > Fix For: 1.17.2, 1.18.1 > > > Code was contributed in FLINK-32737. > Add this new connector to the list of supported ones in the documentation > with a corresponding sub-page for the details of the connector: > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29288) Can't start a job with a jar in the system classpath
[ https://issues.apache.org/jira/browse/FLINK-29288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784174#comment-17784174 ] Trystan commented on FLINK-29288: - Yes, two different kinds of errors. flink-kubernetes-operator v1.6.0 If I do not include *jarURI* the job immediately goes into {*}Job Status: FINISHED / Lifecycle State: UPGRADING{*}. If I include *jarURI* and point it at /opt/flink/lib/myjob.jar, I get linkage errors around the kafka source classes (specifically OffsetResetStrategy). > Can't start a job with a jar in the system classpath > > > Key: FLINK-29288 > URL: https://issues.apache.org/jira/browse/FLINK-29288 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.1.0 >Reporter: Yaroslav Tkachenko >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.2.0 > > > I'm using the latest (unreleased) version of the Kubernetes operator. > It looks like currently, it's impossible to use it with a job jar file in the > system classpath (/opt/flink/lib). *jarURI* is required and it's always > passed as a *pipeline.jars* parameter to the Flink process. In practice, it > means that the same class is loaded twice: once by the system classloader and > another time by the user classloader. This leads to exceptions like this: > {quote}java.lang.LinkageError: loader constraint violation: when resolving > method 'XXX' the class loader org.apache.flink.util.ChildFirstClassLoader > @47a5b70d of the current class, YYY, and the class loader 'app' for the > method's defining class, ZZZ, have different Class objects for the type AAA > used in the signature > {quote} > In my opinion, jarURI must be made optional even for the application mode. In > this case, it's assumed that it's already available in the system classpath. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]
jnh5y commented on code in PR #23680: URL: https://github.com/apache/flink/pull/23680#discussion_r1386993678 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */ +public class JoinTestPrograms { + +static final TableTestProgram NON_WINDOW_INNER_JOIN; +static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL; +static final TableTestProgram JOIN; +static final TableTestProgram INNER_JOIN; +static final TableTestProgram JOIN_WITH_FILTER; +static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY; +static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN; +static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK; +static final TableTestProgram INNER_JOIN_WITH_PK; + +static final SourceTestStep SOURCE_A = +SourceTestStep.newBuilder("A") +.addSchema("a1 int", "a2 bigint", "a3 varchar") +.producedBeforeRestore( +Row.of(1, 1L, "Hi"), +Row.of(2, 2L, "Hello"), +Row.of(3, 2L, "Hello world")) +.producedAfterRestore(Row.of(4, 3L, "Hello there")) +.build(); + +static final SourceTestStep SOURCE_B = +SourceTestStep.newBuilder("B") +.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", "b5 bigint") +.producedBeforeRestore( +Row.of(1, 1L, 0, "Hallo", 1L), +Row.of(2, 2L, 1, "Hallo Welt", 2L), +Row.of(2, 3L, 2, "Hallo Welt wie", 1L), +Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L)) +.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie gehts", 4L)) +.build(); +static final SourceTestStep SOURCE_T1 = +SourceTestStep.newBuilder("T1") +.addSchema("a int", "b bigint", "c varchar") +.producedBeforeRestore( +Row.of(1, 1L, "Hi1"), +Row.of(1, 2L, "Hi2"), +Row.of(1, 2L, "Hi2"), +Row.of(1, 5L, "Hi3"), +Row.of(2, 7L, "Hi5"), +Row.of(1, 9L, "Hi6"), +Row.of(1, 8L, "Hi8"), +Row.of(3, 8L, "Hi9")) +.producedAfterRestore(Row.of(1, 1L, "PostRestore")) +.build(); +static final SourceTestStep SOURCE_T2 = +SourceTestStep.newBuilder("T2") +.addSchema("a int", "b bigint", "c varchar") +.producedBeforeRestore( +Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe"), Row.of(3, 2L, "HeHe")) +.producedAfterRestore(Row.of(2, 1L, "PostRestoreRight")) +.build(); + +static { +NON_WINDOW_INNER_JOIN = +TableTestProgram.of("non-window-inner-join", "test non-window inner join") +.setupTableSource(SOURCE_T1) +.setupTableSource(SOURCE_T2) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a int", "c1 varchar", "c2 varchar") +.consumedBeforeRestore( +Row.of(1, "HiHi", "Hi2"), +Row.of(1, "
Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]
jnh5y commented on code in PR #23680: URL: https://github.com/apache/flink/pull/23680#discussion_r1386985040 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */ +public class JoinTestPrograms { + +static final TableTestProgram NON_WINDOW_INNER_JOIN; +static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL; +static final TableTestProgram JOIN; +static final TableTestProgram INNER_JOIN; +static final TableTestProgram JOIN_WITH_FILTER; +static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY; +static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN; +static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK; +static final TableTestProgram INNER_JOIN_WITH_PK; + +static final SourceTestStep SOURCE_A = +SourceTestStep.newBuilder("A") +.addSchema("a1 int", "a2 bigint", "a3 varchar") +.producedBeforeRestore( +Row.of(1, 1L, "Hi"), +Row.of(2, 2L, "Hello"), +Row.of(3, 2L, "Hello world")) +.producedAfterRestore(Row.of(4, 3L, "Hello there")) +.build(); + +static final SourceTestStep SOURCE_B = +SourceTestStep.newBuilder("B") +.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", "b5 bigint") +.producedBeforeRestore( +Row.of(1, 1L, 0, "Hallo", 1L), +Row.of(2, 2L, 1, "Hallo Welt", 2L), +Row.of(2, 3L, 2, "Hallo Welt wie", 1L), +Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L)) +.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie gehts", 4L)) +.build(); +static final SourceTestStep SOURCE_T1 = +SourceTestStep.newBuilder("T1") +.addSchema("a int", "b bigint", "c varchar") +.producedBeforeRestore( +Row.of(1, 1L, "Hi1"), +Row.of(1, 2L, "Hi2"), +Row.of(1, 2L, "Hi2"), +Row.of(1, 5L, "Hi3"), +Row.of(2, 7L, "Hi5"), +Row.of(1, 9L, "Hi6"), +Row.of(1, 8L, "Hi8"), +Row.of(3, 8L, "Hi9")) +.producedAfterRestore(Row.of(1, 1L, "PostRestore")) +.build(); +static final SourceTestStep SOURCE_T2 = +SourceTestStep.newBuilder("T2") +.addSchema("a int", "b bigint", "c varchar") +.producedBeforeRestore( +Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe"), Row.of(3, 2L, "HeHe")) +.producedAfterRestore(Row.of(2, 1L, "PostRestoreRight")) +.build(); + +static { +NON_WINDOW_INNER_JOIN = +TableTestProgram.of("non-window-inner-join", "test non-window inner join") +.setupTableSource(SOURCE_T1) +.setupTableSource(SOURCE_T2) +.setupTableSink( +SinkTestStep.newBuilder("MySink") +.addSchema("a int", "c1 varchar", "c2 varchar") +.consumedBeforeRestore( +Row.of(1, "HiHi", "Hi2"), +Row.of(1, "
Re: [PR] [FLINK-33469] Implement restore tests for Limit node [flink]
jnh5y commented on PR #23675: URL: https://github.com/apache/flink/pull/23675#issuecomment-1802331893 Responded to comments and moved the new tests to the package `org.apache.flink.table.planner.plan.nodes.exec.stream`. (Relative to the discussion over on https://github.com/apache/flink/pull/23660.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]
jnh5y commented on PR #23660: URL: https://github.com/apache/flink/pull/23660#issuecomment-1802323981 > > Can we reuse org.apache.flink.table.planner.plan.nodes.exec.stream since we are removing tests from that package? > > Yes, let's do that Should the Programs and Tests go into separate packages? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager
Jason Kania created FLINK-33486: --- Summary: Pulsar Client Send Timeout Terminates TaskManager Key: FLINK-33486 URL: https://issues.apache.org/jira/browse/FLINK-33486 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.1 Reporter: Jason Kania Currently, when the Pulsar Producer encounters a timeout when attempting to send data, it generates an unhandled TimeoutException. This is not a reasonable way to handle the timeout. The situation should be handled in a graceful way either through additional parameters that put control of the action under the discretion of the user or through some callback mechanism that the user can work with to write code. Unfortunately, fight now, this causes a termination of the task manager which then leads to other issues. Increasing the timeout period to avoid the issue is not really an option to ensure proper handling in the event that the situation does occur. The exception is as follows: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: persistent://public/default/myproducer-partition-0 at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172) ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send message to the topic persistent://public/default/myproducer-partition-0 within given timeout at org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503) ~[pulsar-client-all-2.11.2.jar:2.11.2] at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-all-2.11.2.jar:2.11.2] ... 1 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33295) Separate SinkV2 and SinkV1Adapter tests
[ https://issues.apache.org/jira/browse/FLINK-33295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi closed FLINK-33295. -- Resolution: Implemented [{{92951a0}}|https://github.com/apache/flink/commit/92951a05127f1e0e2ab0ea04ae022659fc5276ab] in master > Separate SinkV2 and SinkV1Adapter tests > --- > > Key: FLINK-33295 > URL: https://issues.apache.org/jira/browse/FLINK-33295 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common >Reporter: Peter Vary >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > > Current SinkV2 tests are based on the sink generated by the > _org.apache.flink.streaming.runtime.operators.sink.TestSink_ test class. This > test class does not generate the SinkV2 directly, but generates a SinkV1 and > wraps in with a > _org.apache.flink.streaming.api.transformations.SinkV1Adapter._ While this > tests the SinkV2, but only as it is aligned with SinkV1, and the > SinkV1Adapter. > We should have tests where we create a SinkV2 directly and the functionality > is tested without the adapter. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33295) Separate SinkV2 and SinkV1Adapter tests
[ https://issues.apache.org/jira/browse/FLINK-33295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi updated FLINK-33295: --- Fix Version/s: 1.19.0 > Separate SinkV2 and SinkV1Adapter tests > --- > > Key: FLINK-33295 > URL: https://issues.apache.org/jira/browse/FLINK-33295 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream, Connectors / Common >Reporter: Peter Vary >Assignee: Peter Vary >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Current SinkV2 tests are based on the sink generated by the > _org.apache.flink.streaming.runtime.operators.sink.TestSink_ test class. This > test class does not generate the SinkV2 directly, but generates a SinkV1 and > wraps in with a > _org.apache.flink.streaming.api.transformations.SinkV1Adapter._ While this > tests the SinkV2, but only as it is aligned with SinkV1, and the > SinkV1Adapter. > We should have tests where we create a SinkV2 directly and the functionality > is tested without the adapter. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33295] Separate SinkV2 and SinkV1Adapter tests [flink]
mbalassi merged PR #23541: URL: https://github.com/apache/flink/pull/23541 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
echauchot commented on PR #23443: URL: https://github.com/apache/flink/pull/23443#issuecomment-1802266251 @ferenc-csaky thanks for reviewing this PR ! I have addressed your comments, do I have your LGTM when the tests pass ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
echauchot commented on PR #23443: URL: https://github.com/apache/flink/pull/23443#issuecomment-1802261114 > The added logic makes sense IMO. > > Added 2 comments, but in general I would consider separating the whole `INFLATER_INPUT_STREAM_FACTORIES` into a new class as there are a couple functions that uses it and seems quite detachable from `FlieInputFormats`. > > After a quick peek I think something like the following could work: > > ```java > public class InflaterInputStreamFactories { > > public static void register(String fileExt, InflaterInputStreamFactory factory) { ... } > > public static InflaterInputStreamFactory get(Path path) { ... } > > private static InflaterInputStreamFactory get(String fileExt) { ... } > > @VisibleForTesting > public static Set getSupportedCompressionFormats() { ... } > } > ``` > > Also, `ConcurrentHashMap` can be utilized insead of the `synchronized` block, but other than that the current logic could be moved as is now. > > This probably goes beyond the current PR, but I think it worth to note it. WDYT? I agree with the ConcurrentHashMap suggestion. Regarding creating a class just for wrapping a map that is used only in the FileInputFormat it seems overkill to me. An anyway it is indeed outside of the scope of this filesize-fix PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-26203] Add docs for the table connector [flink-connector-pulsar]
tisonkun merged PR #63: URL: https://github.com/apache/flink-connector-pulsar/pull/63 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
echauchot commented on code in PR #23443: URL: https://github.com/apache/flink/pull/23443#discussion_r1386887535 ## flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: ## @@ -136,6 +138,26 @@ public static String createTempFileDirExtension( return f.toURI().toString(); } +public static String createTempTextFileDirForAllCompressionFormats(File tempDir) Review Comment: Agree, better to be more generic here, thx ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
echauchot commented on code in PR #23443: URL: https://github.com/apache/flink/pull/23443#discussion_r1386874406 ## flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java: ## @@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory getInflaterInputStreamFactory( } } +public static Set getSupportedCompressionFormats() { Review Comment: :+1 thanks for pointing out ## flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java: ## @@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory getInflaterInputStreamFactory( } } +public static Set getSupportedCompressionFormats() { Review Comment: :+1 thanks for pointing out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
echauchot commented on code in PR #23443: URL: https://github.com/apache/flink/pull/23443#discussion_r1386875062 ## flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java: ## @@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory getInflaterInputStreamFactory( } } +public static Set getSupportedCompressionFormats() { Review Comment: :+1: thx for pointing out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33059] Support transparent compression for file-connector for all file input formats [flink]
ferenc-csaky commented on code in PR #23443: URL: https://github.com/apache/flink/pull/23443#discussion_r1386659760 ## flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java: ## @@ -157,6 +157,10 @@ protected static InflaterInputStreamFactory getInflaterInputStreamFactory( } } +public static Set getSupportedCompressionFormats() { Review Comment: I'd mark this `@VisibleForTesting`, because only tests use this function. ## flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: ## @@ -136,6 +138,26 @@ public static String createTempFileDirExtension( return f.toURI().toString(); } +public static String createTempTextFileDirForAllCompressionFormats(File tempDir) Review Comment: I think instead of bringing the specific `FileInputFormat` into this general utility, it would be cleaner to pass `Set extensions` as a parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]
dawidwys commented on PR #23660: URL: https://github.com/apache/flink/pull/23660#issuecomment-1801918003 > Can we reuse org.apache.flink.table.planner.plan.nodes.exec.stream since we are removing tests from that package? Yes, let's do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33470] Implement restore tests for Join node [flink]
dawidwys commented on code in PR #23680: URL: https://github.com/apache/flink/pull/23680#discussion_r1386592611 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest.java: ## @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.Before; -import org.junit.Test; - -/** Test json serialization/deserialization for join. */ -public class JoinJsonPlanTest extends TableTestBase { - -private StreamTableTestUtil util; -private TableEnvironment tEnv; - -@Before -public void setup() { Review Comment: Can we remove the json plan files as well? ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/JoinTestPrograms.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */ +public class JoinTestPrograms { + +static final TableTestProgram NON_WINDOW_INNER_JOIN; +static final TableTestProgram NON_WINDOW_INNER_JOIN_WITH_NULL; +static final TableTestProgram JOIN; +static final TableTestProgram INNER_JOIN; +static final TableTestProgram JOIN_WITH_FILTER; +static final TableTestProgram INNER_JOIN_WITH_DUPLICATE_KEY; +static final TableTestProgram INNER_JOIN_WITH_NON_EQUI_JOIN; +static final TableTestProgram INNER_JOIN_WITH_EQUAL_PK; +static final TableTestProgram INNER_JOIN_WITH_PK; + +static final SourceTestStep SOURCE_A = +SourceTestStep.newBuilder("A") +.addSchema("a1 int", "a2 bigint", "a3 varchar") +.producedBeforeRestore( +Row.of(1, 1L, "Hi"), +Row.of(2, 2L, "Hello"), +Row.of(3, 2L, "Hello world")) +.producedAfterRestore(Row.of(4, 3L, "Hello there")) +.build(); + +static final SourceTestStep SOURCE_B = +SourceTestStep.newBuilder("B") +.addSchema("b1 int", "b2 bigint", "b3 int", "b4 varchar", "b5 bigint") +.producedBeforeRestore( +Row.of(1, 1L, 0, "Hallo", 1L), +Row.of(2, 2L, 1, "Hallo Welt", 2L), +Row.of(2, 3L, 2, "Hallo Welt wie", 1L), +Row.of(3, 1L, 2, "Hallo Welt wie gehts", 1L)) +.producedAfterRestore(Row.of(2, 4L, 3, "Hallo Welt wie gehts", 4L)) +.build(); +static final SourceTestStep SOURCE_T1 = +SourceTestStep.newBuilder("T1") +.addSchema("a int", "b b
Re: [PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]
flinkbot commented on PR #23685: URL: https://github.com/apache/flink/pull/23685#issuecomment-1801904915 ## CI report: * 2bcb47ade877cb2fba533001c3ce7c9d3788c9dc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 1:33 PM: --- But in another scenario in production practice, UNDEFINED also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, is it also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached [^reproduce.log] . I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and continued with the execution. was (Author: JIRAUSER298666): But in another scenario in production practice, UNDEFINED als
[jira] [Updated] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Chen updated FLINK-33483: - Attachment: reproduce.log > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042, > reproduce.log > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33485][table] Optimize exists subqueries by looking at metadata rowcount [flink]
snuyanzin opened a new pull request, #23685: URL: https://github.com/apache/flink/pull/23685 ## What is the purpose of the change The idea is to look at metadata rowcount for `EXISTS` subqueries and based on this take a decision whether it could be optimized to `TRUE`/`FALSE` ## Brief change log FlinkSubQueryRemoveRule.java adopted tests, since optimization changed execution plans ## Verifying this change This change is already covered by existing tests, such as CalcPruneAggregateCallRuleTest ProjectPruneAggregateCallRuleTest FlinkAggregateRemoveRuleTest FlinkLimit0RemoveRuleTest ProjectPruneAggregateCallRuleTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
[ https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33485: --- Labels: pull-request-available (was: ) > Optimize the EXISTS sub-query by Metadata RowCount > -- > > Key: FLINK-33485 > URL: https://issues.apache.org/jira/browse/FLINK-33485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > If the sub-query is guaranteed to produce at least one row, just return TRUE. > If the sub-query is guaranteed to produce no row, just return FALSE. > inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} > then it shold be adopted accordingly > examples > {code:sql} > SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) > {code} > aggregation functions always return 1 row even if there is an empty table > then we could just replace this query with > {code:sql} > SELECT * FROM T2 > {code} > another example > {code:sql} > SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) > {code} > {{LIMIT 0}} means no rows so it cold be optimized to > {code:sql} > SELECT * FROM MyTable > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
[ https://issues.apache.org/jira/browse/FLINK-33485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin reassigned FLINK-33485: --- Assignee: Sergey Nuyanzin > Optimize the EXISTS sub-query by Metadata RowCount > -- > > Key: FLINK-33485 > URL: https://issues.apache.org/jira/browse/FLINK-33485 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > > If the sub-query is guaranteed to produce at least one row, just return TRUE. > If the sub-query is guaranteed to produce no row, just return FALSE. > inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} > then it shold be adopted accordingly > examples > {code:sql} > SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) > {code} > aggregation functions always return 1 row even if there is an empty table > then we could just replace this query with > {code:sql} > SELECT * FROM T2 > {code} > another example > {code:sql} > SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) > {code} > {{LIMIT 0}} means no rows so it cold be optimized to > {code:sql} > SELECT * FROM MyTable > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount
Sergey Nuyanzin created FLINK-33485: --- Summary: Optimize the EXISTS sub-query by Metadata RowCount Key: FLINK-33485 URL: https://issues.apache.org/jira/browse/FLINK-33485 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.18.0 Reporter: Sergey Nuyanzin If the sub-query is guaranteed to produce at least one row, just return TRUE. If the sub-query is guaranteed to produce no row, just return FALSE. inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} then it shold be adopted accordingly examples {code:sql} SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2) {code} aggregation functions always return 1 row even if there is an empty table then we could just replace this query with {code:sql} SELECT * FROM T2 {code} another example {code:sql} SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0) {code} {{LIMIT 0}} means no rows so it cold be optimized to {code:sql} SELECT * FROM MyTable {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:50 PM: Biggest confusion is why Flink needs to design UNDEFINED. From the perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at all). Why can't we define failed? Defining FAILED allows us to determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a better solution to this problem in subsequent versions of Flink, or how to better reproduce this scenario? My solution is the same as FLINK-12302, hoping to give a FALLED finalStatus to report to resourcemanager in this case, providing the user with the most clear reminder. Even in this case, the task may have actually run successfully in TM(taskmanager), but after all, an exception (zk disconnection) has occurred. Anyway, executing a task and ultimately giving the user an UNDEFINED state can be very confusing. was (Author: JIRAUSER298666): Biggest confusion is why Flink needs to design UNDEFINED. From the perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at all). Why can't we define failed? Defining FAILED allows us to determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a better solution to this problem in subsequent versions of Flink, or how to better reproduce this scenario? My solution is the same as FLINK-12302, hoping to give a FALLED finalStatus to report to resourcemanager in this case, providing the user with the most clear reminder. Even in this case, the task may have actually run successfully in TM(taskmanager), but after all, an exception (zk disconnection) has occurred. Anyway, executing a task and ultimately giving the user an UNDEFINED state can be confusing. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-20672) notifyCheckpointAborted RPC failure can fail JM
[ https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784035#comment-17784035 ] Yun Tang commented on FLINK-20672: -- [~Zakelly] Thanks for the information. If so, I have another question: do we really need the {{io-executor}} to work with {{FatalExitExceptionHandler}}? From my point of view, if we do not delete the Savepoint correctly (as this is also executed on the {{io-executor}}), shall we need to fail the whole JobManager? If the correct behavior of the exception handler of {{io-executor}} is not fatal exiting, I think we shall correct that behavior first. [~Zakelly], [~roman], [~srichter] WDYT? > notifyCheckpointAborted RPC failure can fail JM > --- > > Key: FLINK-20672 > URL: https://issues.apache.org/jira/browse/FLINK-20672 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.3, 1.12.0 >Reporter: Roman Khachatryan >Assignee: Zakelly Lan >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > Introduced in FLINK-8871, aborted RPC notifications are done asynchonously: > > {code} > private void sendAbortedMessages(long checkpointId, long timeStamp) { > // send notification of aborted checkpoints asynchronously. > executor.execute(() -> { > // send the "abort checkpoint" messages to necessary > vertices. > // .. > }); > } > {code} > However, the executor that eventually executes this request is created as > follows > {code} > final ScheduledExecutorService futureExecutor = > Executors.newScheduledThreadPool( > Hardware.getNumberCPUCores(), > new ExecutorThreadFactory("jobmanager-future")); > {code} > ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error. > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:48 PM: But in another scenario in production practice, UNDEFINED also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, is it also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and continued with the execution. was (Author: JIRAUSER298666): But in another scenario in production practice, UNDEFINED also appears. The J
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:47 PM: But in another scenario in production practice, UNDEFINED also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, is it also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. was (Author: JIRAUSER298666): But in another scenario in production practice, UNDEFINED also appears. The J
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:45 PM: But in another scenario in production practice, UNDEFINED also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784001#comment-17784001 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:44 PM: Links to FLINK-12302. Including that issue, two scenarios were discovered in Flink-1.12.2, with the issue of reporting *UNDEFINED* to yarn resourcemanager. I have reproduced that scenario, which is when the task is completed in tm and the global terminal state (FINISHED or FAILED) is reached, the jm log shows "Job 65ccc2410d4554553225889dbea552d7 reached global terminal state {}, kill the jm process (am) or disconnect the zk connection, which will cause a new jm to be pulled up. The new jm assigned “UNKNOWN” based on the task's status "DONE" in RunningJobRegistry recorded in zk, and ultimately reported “UNDEFINED”. was (Author: JIRAUSER298666): Links to FLINK-12302. Including that issue, two scenarios were discovered in Flink-1.12.2, with the issue of reporting *UNDEFINED* to yarn resourcemanager. I have replicated that scenario, which is when the task is completed in tm and the global terminal state (FINISHED or FAILED) is reached, the jm log shows "Job 65ccc2410d4554553225889dbea552d7 reached global terminal state {}, kill the jm process (am) or disconnect the zk connection, which will cause a new jm to be pulled up. The new jm assigned “UNKNOWN” based on the task's status "DONE" in RunningJobRegistry recorded in zk, and ultimately reported “UNDEFINED”. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:41 PM: Biggest confusion is why Flink needs to design UNDEFINED. From the perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at all). Why can't we define failed? Defining FAILED allows us to determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a better solution to this problem in subsequent versions of Flink, or how to better reproduce this scenario? My solution is the same as FLINK-12302, hoping to give a FALLED finalStatus to report to resourcemanager in this case, providing the user with the most clear reminder. Even in this case, the task may have actually run successfully in TM(taskmanager), but after all, an exception (zk disconnection) has occurred. Anyway, executing a task and ultimately giving the user an UNDEFINED state can be confusing. was (Author: JIRAUSER298666): *Biggest confusion is why Flink needs to design UNDEFINED*. From the perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at all). Why can't we define failed? Defining FAILED allows us to determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a better solution to this problem in subsequent versions of Flink, or how to better reproduce this scenario? My solution is the same as FLINK-12302, hoping to give a FALLED finalStatus to report to resourcemanager in this case, providing the user with the most clear reminder. Even in this case, the task may have actually run successfully in TM(taskmanager), but after all, an exception (zk disconnection) has occurred. Anyway, executing a task and ultimately giving the user an UNDEFINED state can be confusing. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784027#comment-17784027 ] Xin Chen commented on FLINK-33483: -- *Biggest confusion is why Flink needs to design UNDEFINED*. From the perspective of the scenario, UNDEFINED is all due to exceptions (zk or jm exceptions,at all). Why can't we define failed? Defining FAILED allows us to determine and retry tasks, but UNDEFINED here has no meaning at all. Is there a better solution to this problem in subsequent versions of Flink, or how to better reproduce this scenario? My solution is the same as FLINK-12302, hoping to give a FALLED finalStatus to report to resourcemanager in this case, providing the user with the most clear reminder. Even in this case, the task may have actually run successfully in TM(taskmanager), but after all, an exception (zk disconnection) has occurred. Anyway, executing a task and ultimately giving the user an UNDEFINED state can be confusing. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33469] Implement restore tests for Limit node [flink]
dawidwys commented on code in PR #23675: URL: https://github.com/apache/flink/pull/23675#discussion_r1386539479 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/LimitTestPrograms.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecLimit}. */ +public class LimitTestPrograms { + +static final Row[] DATA = +new Row[] { +Row.of(2, "a", 6), +Row.of(4, "b", 8), +Row.of(6, "c", 10), +Row.of(1, "a", 5), +Row.of(3, "b", 7), +Row.of(5, "c", 9) +}; +static final TableTestProgram LIMIT = +TableTestProgram.of("limit", "validates limit node") +.setupTableSource( +SourceTestStep.newBuilder("source_t") +.addSchema("a INT", "b VARCHAR", "c INT") +.producedBeforeRestore(DATA) +.producedAfterRestore(DATA) +.build()) +.setupTableSink( +SinkTestStep.newBuilder("sink_t") +.addSchema("a INT", "b VARCHAR", "c BIGINT") +.addOption("sink-insert-only", "false") Review Comment: This is set by the `RestoreTestBase` ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LimitJsonPlanTest.java: ## @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.Before; -import org.junit.Test; - -/** Test json serialization for sort limit. */ -public class LimitJsonPlanTest extends TableTestBase { - -private StreamTableTestUtil util; -private TableEnvironment tEnv; - -@Before -public void setup() { -util = streamTestUtil(TableConfig.getDefault()); -tEnv = util.getTableEnv(); - -String srcTableDdl = -"CREATE TABLE MyTable (\n" -+ " a bigint,\n" -+ " b int not null,\n" -+ " c varchar,\n" -+ " d timestamp(3)\n" -+ ") with (\n" -+ " 'connector' = 'values',\n" -+ " 'bounded' = 'false')"; -tEnv.executeSql(srcTableDdl); -} - -@Test -public void testLimit() { -String sinkTableDdl = -"CREATE TABLE MySink (\n" -+ " a bigint,\n" -+ " b bigint\n" -
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:28 PM: But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters to rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be found i
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:27 PM: But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. So how exactly is “UNKNOWN” identified here, and is it also determined after reading from the RunningJobRegistry in zk? I have also tried many times and did not reproduce this scene. The reproduction log is attached. I think the reason for the difficulty in reproducing is : When I disconnect all zks, jm will quickly down and restart, and the log will shows an error. {code:java} org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(deregisterApplication(ApplicationStatus, String))) sent to akka.tcp://flink@192.168.22.121:42347/user/rpc/resourcemanager_0 because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67) ~ at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~ at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~ at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~ at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~ at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~ at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~ at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~ at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~ at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~ at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~ at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~ at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~ at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~ at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~ at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~ {code} Here, I speculate that the disconnection of zk may have affected rm's leadership, leading to issues when jm unregisters rm with ApplicationStatus: UNDEFINED. However, in actual production scenarios, jm did not fail and this has not been reproduced. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be
Re: [PR] [FLINK-33455] Implement restore tests for SortLimit node [flink]
dawidwys commented on code in PR #23660: URL: https://github.com/apache/flink/pull/23660#discussion_r1386534891 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SortTestPrograms.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecSortLimit}. */ +public class SortTestPrograms { + +static final TableTestProgram SORT_LIMIT_ASC = +TableTestProgram.of( +"sort-limit-asc", +"validates sort limit node by sorting integers in asc mode") +.setupTableSource( +SourceTestStep.newBuilder("source_t") +.addSchema("a INT", "b VARCHAR", "c INT") +.producedBeforeRestore( +Row.of(2, "a", 6), +Row.of(4, "b", 8), +Row.of(6, "c", 10), +Row.of(1, "a", 5), +Row.of(3, "b", 7), +Row.of(5, "c", 9)) +.producedAfterRestore( +Row.of(2, "a", 6), +Row.of(4, "b", 8), +Row.of(6, "c", 10), +Row.of(1, "a", 5), +Row.of(3, "b", 7), +Row.of(5, "c", 9)) Review Comment: Personally, I find this confusing. Having exact same data as an input twice is an artificial case. I think it rarely occurs in production use cases. Bear in mind this does not model the "replaying" case: reading from the same offset before and after restore. The API models a perfect source that stores the offsets in the checkpoint. Currently you model an input like (using only the first column for simplicity): ``` offset | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 data. | 2 | 4 | 6 | 1 | 3 | 5 | 2 | 4 | 6 | 1 | 3 | 5 ``` with a savepoint at offset `5`. In other words. If you were testing `SELECT a, COUNT(*) FROM t GROUP BY a;` you'd get (after materializing all updates of course): ``` a | COUNT(*) -- 2 | 2 4 | 2 6 | 2 1 | 2 3 | 2 5 | 2 ``` Having said all that. I am not sure which property having the same data twice showcases? I find it even more confusing, because I am starting to see a pattern in using the same data twice in more PRs: https://github.com/apache/flink/pull/23675 Having such visible patterns easily caughts the attention and makes people wonder. If we want to test particular behaviour could we add a comment what is the role of each row, e.g. ``` .producedBeforeRestore( Row.of(2, "a", 6), Row.of(4, "b", 8), Row.of(6, "c", 10), Row.of(1, "a", 5), Row.of(3, "b", 7), Row.of(5, "c", 9)) .producedAfterRestore( Row.of(2, "a", 6), # should replace [3, b, 7] from before the restore Row.of(4, "b", 8), # should be ignored as it is "bigger" than the lowest [2, "a", 6] Row.o
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:13 PM: But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. 15:00:57.767 Unregister application from the YARN Resource Manager with final status UNDEFINED. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will >
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:11 PM: But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. The most important >thing is that after ZK reconnects, for some unknown reason, jm directly >determines that the task is in an UNKNOWN state: *Shutting >YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics >null*. During this process, jm did not restart, which is different from >FLINK-12302. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by A
Re: [PR] [FLINK-32738][formats] PROTOBUF format supports projection push down [flink]
zhougit86 commented on PR #23323: URL: https://github.com/apache/flink/pull/23323#issuecomment-1801762669 > @zhougit86 Sorry, I'm a little busy recently. You can take a look at this [PR](https://github.com/apache/flink/pull/23162). There are some modifications to the logic of codegen and the code will be split. You may need to rebase this for compatibility. @ljw-hit Ok, I will wait for your PR be modified first, and rebase my part on yours. Let me know once yours are merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen edited comment on FLINK-33483 at 11/8/23 12:06 PM: But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. 15:00:57.741 Job 281592085ed7f391ab59b83a53c40db3 was not finished by JobManager. 15:00:57.742 Shutting down cluster because job not finished 15:00:57.742 Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null. {code} >From the logs, it can be seen that there was a disconnection of zk for a few >seconds. During the disconnection period, rm(resourcemanager) was affected and >the Flink task was suspended, attempting to reconnect zk. was (Author: JIRAUSER298666): But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log, it can be seen that: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. {code} > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784007#comment-17784007 ] Xin Chen commented on FLINK-33483: -- But in another scenario in production practice, UN also appears. The Jm log can be found in the file [^container_e15_1693914709123_8498_01_01_8042] , but I have not fully reproduced this scene. Based on the key information in the log, it can be seen that: {code:java} 15:00:57.657 State change: SUSPENDED Connection to ZooKeeper suspended, waiting for reconnection. 15:00:54.754 org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null 15:00:54.759 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RUNNING to RESTARTING. 15:00:54.771 Job DataDistribution$ (281592085ed7f391ab59b83a53c40db3) switched from state RESTARTING to SUSPENDED. org.apache.flink.util.FlinkException: JobManager is no longer the leader. Unable to canonicalize address zookeeper:2181 because it's not resolvable. 15:00:55.694 closing socket connection and attempting reconnect 15:00:57.657 State change: RECONNECTED 15:00:57.739 Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. 15:00:57.740 Connection to ZooKeeper was reconnected. Leader election can be restarted. {code} > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?
[ https://issues.apache.org/jira/browse/FLINK-33483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xin Chen updated FLINK-33483: - Attachment: container_e15_1693914709123_8498_01_01_8042 > Why is “UNDEFINED” defined in the Flink task status? > > > Key: FLINK-33483 > URL: https://issues.apache.org/jira/browse/FLINK-33483 > Project: Flink > Issue Type: Improvement > Components: Runtime / RPC, Runtime / Task >Affects Versions: 1.12.2 >Reporter: Xin Chen >Priority: Major > Attachments: container_e15_1693914709123_8498_01_01_8042 > > > In the Flink on Yarn mode, if an unknown status appears in the Flink log, > jm(jobmanager) will report the task status as undefined. The Yarn page will > display the state as FINISHED, but the final status is *UNDEFINED*. In terms > of business, it is unknown whether the task has failed or succeeded, and > whether to retry. It has a certain impact. Why should we design UNDEFINED? > Usually, this situation occurs due to zk(zookeeper) disconnection or jm > abnormality, etc. Since the abnormality is present, why not use FAILED? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33257][connectors/mongodb] Support filter pushdown in MongoDB connector [flink-connector-mongodb]
Jiabao-Sun commented on PR #17: URL: https://github.com/apache/flink-connector-mongodb/pull/17#issuecomment-1801742466 Thanks @xuyangzhong and @leonardBang for the review. Could you help review it again when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org